feat: streaming per-period Icechunk ingest — full #64 implementation#144
Draft
turban wants to merge 81 commits into
Draft
feat: streaming per-period Icechunk ingest — full #64 implementation#144turban wants to merge 81 commits into
turban wants to merge 81 commits into
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
Implements the “streaming per-period ingest” architecture for Icechunk-backed Zarr v3 stores by introducing an IngestionPlugin protocol + orchestrator loop, adding built-in plugins (ERA5-Land, CHIRPS3, WorldPop), and updating the API surface (STAC, /zarr, resample, publications, sync) to recognize ArtifactFormat.ICECHUNK.
Changes:
- Add new ingest subsystem (
climate_api.ingest.*) for per-period streaming writes, resume/cursor support, and optional post-ingest rechunking. - Add built-in ingestion plugins for ERA5-Land, CHIRPS3, and WorldPop; update built-in dataset templates to reference
ingestion.plugin. - Extend serving/discovery/processing paths (STAC,
/zarr, resample, publications, sync) to handleICECHUNKartifacts, and add substantial unit test coverage + documentation updates.
Reviewed changes
Copilot reviewed 39 out of 40 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| uv.lock | Adds Icechunk dependency lock entry. |
| pyproject.toml | Adds icechunk>=2.0,<3 runtime dependency. |
| climate_api/ingestions/schemas.py | Introduces ArtifactFormat.ICECHUNK. |
| climate_api/ingest/protocol.py | Defines GridSpec and IngestionPlugin protocol. |
| climate_api/ingest/orchestrator.py | Implements async per-period ingest loop and sync wrapper. |
| climate_api/ingest/store.py | Adds Icechunk repo open/create, committed-period read, and rechunk helper. |
| climate_api/ingest/init.py | Exposes ingest protocol/orchestrator symbols. |
| climate_api/ingest/plugins/init.py | Declares built-in plugin package. |
| climate_api/ingest/plugins/chirps3.py | Adds CHIRPS3 daily plugin implementation. |
| climate_api/ingest/plugins/era5_land.py | Adds ERA5-Land plugin implementation. |
| climate_api/ingest/plugins/worldpop.py | Adds WorldPop yearly plugin implementation. |
| climate_api/data/datasets/chirps3.yaml | Switches template ingestion to plugin + params. |
| climate_api/data/datasets/era5_land.yaml | Switches templates ingestion to plugin + params. |
| climate_api/data/datasets/worldpop.yaml | Switches template ingestion to plugin + params. |
| climate_api/data_registry/services/datasets.py | Updates dataset template validation for ingestion changes. |
| climate_api/ingestions/services.py | Routes ingestion + /zarr store listing/serving through Icechunk-aware code paths. |
| climate_api/ingestions/routes.py | Removes extent country_code forwarding; uses updated create_artifact signature. |
| climate_api/ingestions/sync_engine.py | Adds current_end override and ICECHUNK append support changes. |
| climate_api/data_accessor/services/accessor.py | Adds open_icechunk_dataset() and dataset-coverage helper for pre-opened datasets. |
| climate_api/stac/services.py | Adds ICECHUNK eligibility and Icechunk dataset opening in collection builds. |
| climate_api/processing/resample.py | Allows ICECHUNK as a resampling source format. |
| climate_api/publications/services.py | Ensures Icechunk artifacts are served via /zarr (not pygeoapi) and supports axis inspection. |
| climate_api/system/routes.py | Updates ingestion route call signature (no country_code). |
| climate_api/data_manager/services/downloader.py | Removes legacy download route/helpers; retains Zarr build/utility functionality. |
| climate_api/data_manager/routes.py | Removes legacy internal download endpoint. |
| tests/test_ingest_orchestrator.py | Adds orchestrator tests (resume, cursor, cancellation, rechunk, plugin loading). |
| tests/test_ingest_plugins.py | Adds unit tests for CHIRPS3 + WorldPop plugin logic and mocked fetch behavior. |
| tests/test_processing_resample.py | Extends resample tests for Icechunk and netcdf rejection messaging. |
| tests/test_stac.py | Adds STAC tests for Icechunk artifact inclusion + dataset opening path. |
| tests/test_datasets.py | Adds dataset link test for published Icechunk STAC link. |
| tests/test_datasets_sync.py | Updates sync tests to use store-authoritative current_end for Icechunk. |
| tests/test_dataset_registry.py | Updates registry tests to use ingestion.plugin. |
| tests/test_config.py | Updates plugin-dir tests to use ingestion.plugin. |
| tests/test_downloader.py | Removes tests tied to removed legacy download behavior. |
| docs/instance_guide.md | Updates instance plugin guidance for ingestion plugins. |
| docs/extensibility.md | Documents IngestionPlugin as the ingestion extension point. |
| docs/adding_custom_datasets.md | Replaces download-function docs with ingestion-plugin contract and examples. |
| docs/built_in_datasets.md | Updates built-in dataset ingest + sync behavior descriptions for streaming ingest. |
| docs/architecture.md | Updates lifecycle + extension-point architecture to reflect plugin-driven ingest. |
| AGENTS.md | Updates contributor-facing concept summary to reference ingestion plugins. |
Contributor
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 42 out of 43 changed files in this pull request and generated 11 comments.
Comments suppressed due to low confidence (2)
tests/test_ingest_plugins.py:362
- Same issue as the WorldPop helper above:
.rioaccessor is used without importingrioxarrayto register it. Add an explicitimport rioxarrayin this test module (or in this helper) before accessing.rio.
docs/extensibility.md:96 - Step 5 says the orchestrator “Commits to the Icechunk store after every commit_batch_size periods”, but the implementation commits every period and only checkpoints the cursor every
commit_batch_size. This doc should be corrected so plugin authors don’t design around batched commits that won’t happen.
2. Calls `periods()` once to get the full period list; filters against already-committed time coordinates.
3. Creates all fetch tasks upfront so up to `max_concurrency` fetches are in flight simultaneously.
4. Awaits tasks in chronological order so writes are always sequential.
5. Commits to the Icechunk store after every `commit_batch_size` periods.
6. On restart, resumes from the last committed period — a crash loses at most one uncommitted batch.
…plugin Implements the plugin architecture and orchestration loop from issue #64. ### What's new **climate_api/ingest/** — new package: - protocol.py: GridSpec dataclass and IngestionPlugin Protocol - orchestrator.py: async run_ingest() loop with bounded concurrency, per-batch commits, cursor-based resume, and cancel checkpoint - store.py: Icechunk repository open/create helpers and read_committed_period_ids() for store-based resume fallback - plugins/era5_land.py: Era5LandPlugin — streams hourly ERA5-Land data from DestinE Earth Data Hub via remote zarr; handles 0-360 longitude, per-month parallel fetches, and availability lag **Dataset registry** — ingestion.plugin is now a valid alternative to ingestion.function; datasets can carry both keys during migration. **era5_land.yaml** — registers Era5LandPlugin alongside the existing function key so both paths are available. **ingestions/services.py** — branches on ingestion.plugin: routes ingest requests through run_ingest_sync and registers the Icechunk store as an ArtifactFormat.ICECHUNK artifact. **accessor.py** — adds coverage_from_open_dataset() for callers that already hold a store handle (e.g. an Icechunk session). **pyproject.toml** — adds icechunk>=2.0,<3 dependency. 14 tests covering: write all periods, fetch-exactly-once, idempotency, cursor-based resume, progress reporting, batch commit checkpoints, no-op on empty period list, cancellation, sync wrapper, load_plugin, and read_committed_period_ids. Closes: partially addresses #64 (Tracks 2–4 remain)
Two bugs found during Norway instance testing: 1. Icechunk 2.x sessions do not expose uncommitted writes to subsequent zarr.open_group calls, so to_zarr(append_dim='time') on period N+1 saw an empty store. Fix: open a fresh writable session per period so each append reads the prior committed snapshot. commit_batch_size now controls cursor-save frequency rather than commit frequency. 2. Era5LandPlugin._build_periods ignored the hour component of start/end, so 'start=2024-01-01T00 end=2024-01-01T00' generated all 24 hours. Fix: compare full period-ID strings (lexicographic) after generating. Add three unit tests for the period-filter fix.
ERA5-Land source data carries zarr v2 Blosc codec metadata. When xarray loads the dataset it retains the source encoding, which fails with a BytesBytesCodec type error when writing to a zarr v3/icechunk store. Clearing encoding on all vars and coords before returning from fetch lets the orchestrator write with zarr v3-compatible defaults.
…ct timestamps When encoding is cleared and xarray picks a default unit for the first write it may choose "days since YYYY-MM-DD". Subsequent append_dim writes then encode sub-daily offsets as integer days, shifting all non-midnight hours to the wrong date. Explicitly setting "hours since 1970-01-01" on every fetched period ensures the first write establishes a unit that all appends can faithfully represent as integers.
…s source of truth Three related changes: 1. plan_sync accepts an optional current_end parameter that overrides the artifact metadata's coverage.temporal.end. For Icechunk stores the artifact record may lag behind what is actually committed on disk, so callers pass the store-authoritative value. 2. _supports_append recognises ArtifactFormat.ICECHUNK and always returns True. The orchestrator already handles incremental appends via its own read_committed_period_ids call; no sync.execution: append YAML flag is required for Icechunk datasets. 3. sync_dataset reads read_committed_period_ids directly from the Icechunk store before calling run_sync and passes the result as current_end. This ensures the sync plan reflects the true on-disk state rather than stale metadata. Additionally, _create_icechunk_artifact now accepts an optional ingest_start parameter. For append syncs this is set to the delta_start so the orchestrator scans only the new period range rather than the full historical range (e.g. from 1950), which avoids enumerating hundreds of thousands of already-committed periods needlessly.
… time: N) Adds rechunk_store(store_path, time_chunk=N) in store.py. The function opens the latest committed snapshot via a readonly session, rewrites all variables with the new time chunk size via a writable session (using dask for lazy chunk-by-chunk processing), and commits the result as a new Icechunk snapshot. MVCC ensures the previous snapshot is preserved if the rechunk fails. run_ingest / run_ingest_sync gain a rechunk_time: int | None parameter. When set, rechunk_store is called after all periods have been committed. Era5LandPlugin declares rechunk_time = 12 (twelve hourly periods per chunk). _create_icechunk_artifact reads this attribute and passes it to run_ingest_sync only on initial ingest (ingest_start is None); sync appends skip rechunking to avoid rewriting the full store on every small update.
CHIRPS3 fetches daily COG files via HTTP range requests — one period per day, capped to the last complete published month (lag ~1–2 months). Four concurrent fetches via thread pool; commits batched at 30 periods (~1/month). WorldPop downloads per-country GeoTIFFs for global2 (2015–2030, 100 m constrained) or global1 (2000–2020). Max concurrency 1 — full-country files can be several hundred MB. Both plugins reuse the probe-estimate pattern (no network call) to derive GridSpec from known pixel resolution. Includes dataset YAML registrations and full unit test coverage (41 tests).
open_icechunk_dataset opens an Icechunk store via a readonly MVCC session
and is wired into every place open_zarr_dataset is used: the publications
provider axes, STAC collection builder, and resample source loader.
The zarr-serving HTTP routes handle ArtifactFormat.ICECHUNK by opening the
store via repo.readonly_session("main") and serving all zarr keys through
session.store[key] — Icechunk's virtual zarr v3 interface. Directory-style
paths enumerate group members via zarr.open_group. Metadata keys are served
as JSON; chunk keys as raw bytes.
STAC link generation and resample source validation both recognise ICECHUNK
alongside ZARR. pygeoapi registration is skipped for Icechunk artifacts
(pygeoapi reads plain zarr directories, not Icechunk repos).
GridSpec gains an extra_dims field for plugins that produce multidimensional
stores (e.g. WorldPop age/sex with dims {age_group: 20, sex: 2}). The
orchestrator does not act on extra_dims yet; the field exposes the intent
to protocol users and prepares for future chunk-shape computation.
The orchestrator calls repo.expire_snapshots(older_than=now) after the
ingest loop (and rechunk pass) completes. Each period write creates one
Icechunk snapshot; for a 35-year daily ingest this produces ~12,775
intermediate snapshots. expire_snapshots marks them as expired without
touching chunk data — the "main" branch ref preserves HEAD. Run
garbage_collect separately to reclaim manifest storage.
The repo is reopened after rechunk_store() so expire_snapshots operates
on the post-rechunk HEAD rather than the stale pre-rechunk repo handle.
extensibility.md: add IngestionPlugin as a first-class extension point with full protocol reference, GridSpec field table, and a comparison table for choosing between function and plugin approaches. adding_custom_datasets.md: update overview to mention both ingestion paths; update ingestion field reference table with ingestion.plugin and ingestion.params; add IngestionPlugin skeleton and dataset template example at end of doc. built_in_datasets.md: replace stale "Sync behaviour" descriptions with accurate Ingest method + Sync behaviour paragraphs for CHIRPS3 (COG range requests), ERA5-Land (remote zarr, monthly fetch), and WorldPop (per-country GeoTIFF download). Add country_code note to WorldPop. architecture.md: replace single-path data lifecycle diagram with dual-path diagram (function path vs plugin path); add IngestionPlugin to the plugin contract section; update the append execution mode section to distinguish cache-based (function path) from Icechunk-based (plugin path) behaviour.
Drop the legacy NetCDF-file-based ingestion.function download path entirely. All datasets now ingest through IngestionPlugin directly into Icechunk stores. - Remove ingestion.function and default_params from YAML dataset configs - Require ingestion.plugin in dataset validation (was optional fallback) - Remove download_dataset, _validate_spatial_coverage, and related routes - Remove country_code param from create_artifact/run_sync (was only used by the legacy download path); inject it automatically from extent.country_code into any plugin whose constructor declares the parameter (WorldPop only) - Update docs (architecture, extensibility, adding_custom_datasets, instance_guide) and AGENTS.md to reflect plugin-only ingestion Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… datasets WorldPop GeoTIFFs carry scale_factor/add_offset in variable attributes. xarray rejects these as conflicting CF encoding keys when appending to a Zarr store on the second period. Strip all CF encoding keys from attrs after loading so append_dim writes succeed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…gs84 Datasets like WorldPop are stored in WGS84 (EPSG:4326) while a deployment may be configured with a projected CRS (e.g. EPSG:32633). Using the deployment CRS as proj:code caused the map viewer to wrongly treat WGS84 coordinates as projected, breaking rendering. spatial_wgs84 coverage was also computed by reprojecting already-WGS84 coordinates as if they were UTM metres. Fix: read the actual CRS from the dataset's spatial_ref coordinate (written by rioxarray) and use it for both proj:code in the STAC collection and native_crs in coverage computation. Falls back to the configured deployment CRS when no spatial_ref is present. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ng paths - orchestrator.py: update module docstring to say "sync methods" and clarify asyncio.to_thread wrapping; protocol.py: same in IngestionPlugin docstring - GridSpec docstring: clarify shape is logged but not used for chunking; the field drives GeoZarr attribute writes, not store encoding - stac/services.py + sync_engine.py: add Path.exists() check before open_or_create_repo() in read-only probe paths — the function creates a new empty Icechunk repo when the path is missing, which is an unintended side-effect during STAC generation and sync planning Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…arify docs - Remove load_cursor from run_ingest/run_ingest_sync/create_artifact/ execute_ingest: resume is always store-based (read_committed_period_ids); the parameter accepted a value but never called it - Collapse already_done/done_offset duplicate in orchestrator.py - ERA5LandPlugin.probe: compute shape from known 0.1° resolution (no network I/O); update _select_bbox to use the constant instead of lazy dask compute. Resolves "metadata-only probe" docstring claim and x_dim/y_dim mismatch - protocol.py: periods() docstring — must be pure computation, no I/O; rechunk_time docstring — clarify omitting the attribute is valid - WorldPopPlugin: document full-rebuild-on-sync and memory bound in docstring - Add test_run_ingest_preserves_committed_periods_on_fetch_error: verifies periods committed before a mid-run fetch exception are retained in the store - Add asyncio.run() event-loop precondition note to run_ingest_sync docstring Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Icechunk is an internal storage detail; the /zarr/ HTTP endpoint always serves standard Zarr to clients regardless of backing format. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Both Ingest and Sync now stream SSE progress events via fetch() instead of blocking until redirect. The manage page shows a progress bar and period counter (N / total) while the job runs, then navigates on completion or surfaces errors inline without a page reload. Backend: on_progress threaded through sync_dataset → run_sync → create_artifact_fn so both code paths emit progress events. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Without Cache-Control: no-cache and a periodic keepalive comment, uvicorn/nginx buffer the text/event-stream response so no events reach the browser until the connection closes. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Opening the DestinE zarr store (auth handshake + metadata fetch) on every fetch_period call made each of 145+ periods pay the full connection overhead. Cache the opened and longitude-corrected dataset on the instance; subsequent calls reuse it. Double-checked lock guards against concurrent opens when max_concurrency=4. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Required by FastAPI for form parsing in the manage routes (await request.form()). Was missing from pyproject.toml, causing install failures in downstream projects. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
xstac crashes with ValueError when the time dimension has only one element (0-d scalar after sel). Expand to a 1-element array so min/max concatenation succeeds. Fixes 503 when viewing a dataset whose first ingest batch has only committed a single period. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
A date-only end like '2026-01-01' was normalizing to T00, causing hourly ingests to include only a single period instead of all 24 hours of the day. Now is_end=True maps a date-only input to T23. Also expand scalar time dimension before xstac call to avoid a crash when a dataset contains exactly one committed period. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
ERA5-Land stores have a scalar valid_time coordinate alongside the true time dimension. The old hasattr check matched the scalar first, giving coverage start == end == last-fetched-hour instead of the full range. Checking ds.dims instead ensures only actual array dimensions are returned. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Datasets like ERA5-Land have no spatial_ref coordinate but do have x/y dimensions with units=degrees_east/degrees_north. The old detection only checked spatial_ref, causing the deployment CRS (EPSG:32633 for Norway) to be used instead of EPSG:4326 — producing a wrong proj:code in the STAC collection and a garbage bbox from a false UTM→WGS84 reprojection. Now both _detect_dataset_crs and _read_crs_from_spatial_ref fall back to dimension attribute inspection. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
_is_pyramid_zarr only checked for a "0/" subdirectory, which exists
in flat zarr pyramid stores but not in Icechunk stores. Icechunk stores
use an opaque internal layout (chunks/, manifests/, etc.) regardless of
whether the data has pyramid sub-groups. Now falls back to opening the
Icechunk store and checking if "0" is a root group member, so that
the STAC zarr asset href correctly points to /zarr/{id}/0 instead of
the root group, allowing the map viewer to render pyramid data.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sending 4 concurrent requests to data.chc.ucsb.edu triggered a CrowdSec IP ban (HTTP 403). Serial fetching is sufficient; each COG range request completes in under a second. Also adds retry logic for 429/503 responses. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Static stores (time_dim=False, e.g. Copernicus DEM) have no time coordinate. get_time_dim raises ValueError; catch it and return start=end=None so artifact creation completes normally. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
A static (time_dim=False) store has no time coordinate, so
read_committed_period_ids previously always returned empty, causing
every re-ingest to re-fetch all source tiles. If the store has spatial
data with non-zero dimensions, return {"static"} so the orchestrator
treats it as already complete and skips the download.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
CoverageTemporal.start/end now accept None so that datasets ingested with time_dim=False (e.g. Copernicus DEM elevation) can be recorded without a temporal extent. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Two crashes for static stores (time_dim=False): 1. parse_period_string_to_datetime(None) in temporal extent — guard with conditional so None start/end pass through as None (valid STAC). 2. get_time_dim(ds) raises ValueError — catch it and set time_dimension=None so xstac omits the time cube:dimension for timeless datasets. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…nually xstac requires a temporal_dimension and raises KeyError when None is passed and no CF T axis exists. For static (time_dim=False) datasets, bypass xstac entirely and build only x/y spatial cube:dimensions. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…/end Static datasets have temporal.start=None and temporal.end=None. Return early with [[null, null]] STAC interval instead of crashing on parse_period_string_to_datetime(None). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
zarr-layer reads the multiscales attribute at the root to select the appropriate overview level based on viewport zoom. Pointing it at /0 (full resolution) bypassed this and caused all-chunks-at-full-res rendering. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds a display.palette alternative to display.colormap + range in dataset YAML. A palette is a dict mapping uint8 pixel value → hex color, which is emitted as climate_api:palette in the STAC renders object. The map viewer builds a 256-entry LUT directly (lut[value] = color) and passes it to zarr-layer with clim=[0,255] so no rescaling occurs — each class value indexes its own color. Legend range labels are suppressed for palette mode since the axis has no numeric meaning. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Wrap long Field descriptions in ingestions/schemas.py - Remove unused artifact_path assignment in stac/services.py Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- accessor.py: annotate start/end as str | None before try/except - sync_engine.py: guard against None current_end for temporal/release sync - stac/services.py: handle None temporal bounds independently when formatting - test_stac.py: update pyramid href assertions to expect root URL (not /0) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Complete implementation of the streaming per-period ingest architecture from #64, building on the Track 1 probe spike (notebook, branch
probe-spike).Track 2 — per-period ingest
climate_api/ingest/protocol.py—GridSpecdataclass andIngestionPluginProtocol. Plugins implement three sync methods and never touch zarr:Methods are sync — the orchestrator runs
probeandfetch_periodinasyncio.to_threadso I/O-bound plugins run in the thread pool without managing their own executor.periodsis called directly (pure computation, no I/O).enumerate_periods(start, end, period_type, cutoff=None)is a shared utility for standard period enumeration (daily/hourly/monthly/yearly with optional availability cutoff). Built-in plugins call it fromperiods(); plugin authors can import it alongsideGridSpec.GridSpecgainsextra_dims: dict[str, int]for multidimensional stores.rechunk_timeandpyramidare optional class attributes read viagetattrand excluded from the Protocol — plugins that omit them still pass theisinstancecheck (Python 3.13 checks non-callable protocol members).climate_api/ingest/orchestrator.py—run_ingest()async loop:max_concurrency)commit_batch_sizeperiodslast_committedfrom job cursor; falls back to reading the store's committed time coordinatesis_cancel_requested()before each period; raisesJobCancelledErrorrechunk_timeset), pyramid build (ifpyramid=True), thenrepo.expire_snapshots()to prune intermediate snapshotsrun_ingest_sync()wrapper callsasyncio.run()for use in the threaded job frameworkclimate_api/ingest/store.py—open_or_create_repo(),read_committed_period_ids(),rechunk_store(), andbuild_pyramid_store()helpers.Track 3 — store-based sync
Sync reads committed time coordinates from the Icechunk store via
read_committed_period_ids()and appends only missing periods — replaces the previouschanged_filesmtime-diff approach. Sync availability is derived entirely fromplugin.periods(); thesync.availability.latest_available_functionYAML field andclimate_api.providers.availabilitymodule have been removed.Track 4 — in-place rechunk
After initial ingest,
rechunk_store()rewrites the store with a coarser time chunk (e.g.time: 1→time: 30) using Icechunk's MVCC. No separate data copy; the previous snapshot is preserved until GC.Track 5 — multiscale pyramid
After rechunk,
build_pyramid_store()builds a multiscale overview pyramid when the plugin declarespyramid = True. Level count is computed automatically from spatial dimensions (512-pixel tile target, 2048×2048 pixel threshold, max 8 levels). The pyramid is written as a DataTree viatopozarr.create_pyramid()and committed as a new Icechunk snapshot.open_icechunk_dataset()detects pyramid stores via"multiscales" in root.attrsand opens group"0"automatically.Built-in plugins
plugins/era5_land.py—Era5LandPluginfor hourly ERA5-Land from DestinE Earth Data Hub: monthly fetch, daily aggregation, timezone-aware UTC offset.max_concurrency = 4,commit_batch_size = 720.plugins/chirps3.py—Chirps3Pluginfor CHIRPS v3 daily precipitation: COG range requests, only the configured bbox window is downloaded.max_concurrency = 4,commit_batch_size = 30,rechunk_time = 30.plugins/worldpop.py—WorldPopPluginfor WorldPop Global2 yearly population: per-country GeoTIFF download, clipped to bbox.max_concurrency = 1,commit_batch_size = 1,pyramid = True.country_coderequired iningestion.params.Data accessor and HTTP serving
open_icechunk_dataset()opens an Icechunk store via a readonly MVCC session; detects pyramid stores and opens group"0"automaticallyget_data()usesopen_icechunk_dataset()directly viaget_icechunk_path(dataset)— the legacy flat-zarr and NetCDF fallback paths have been removed/zarr/{dataset_id}and/zarr/{dataset_id}/{path}handleArtifactFormat.ICECHUNKby serving zarr keys throughsession.store._get_bytes_sync(key); bare group-path requests (e.g.GET /zarr/dataset/0) return an HTML directory listing so fsspec's_ls_realcan traverse pyramid subgroups, enablingxr.open_zarr(group="0")over HTTP/zarr/{dataset_id}) so zarr-layer clients select zoom levels by opening numbered subgroups; thesharding_indexedcodec is stripped from pyramid-level metadata for JS client compatibility/processes/resample), and publications all recogniseICECHUNKalongsideZARRDataset registry
ingestion.pluginreplacesingestion.functionin dataset templates. The registry validator now requiresingestion.plugin. New built-in templates:chirps3.yaml,worldpop.yaml.Legacy code removed
This PR removes six categories of code that had accumulated since the pre-Icechunk era. 314 lines deleted across 15 files.
ArtifactFormat.NETCDF— enum value removed;/datasets/{id}/downloadendpoint and the NETCDF download link in_dataset_linksremoved with itArtifactRecord.path— single-file legacy field removed; all path-fallback logic (record.path or record.asset_paths[0]) replaced with directasset_paths[0]prefer_zarrparameter — removed fromcreate_artifact,sync_dataset,_store_artifact_record,_upsert_artifact_record,_find_existing_artifact,_find_existing_artifact_in_records, and both sync/system call sites_upgrade_legacy_record— format-migration function removed;_load_recordsand_mutate_recordsnow deserialise directly withmodel_validatelist_datasets()/open_dataset()— module-level duplicates inclient.pyremoved (superseded byClient.catalog()andClient.open())_clear_xstac_collection_cache()— dead function removed fromstac/services.py; tests updated to clear the cache dict directlyclimate_api/providers/package —availability.pyand entire module deletedbuild_dataset_zarr()and the NetCDF-cache-to-zarr pipeline — removed fromdata_manager/services/downloader.pyget_data()— all deployed stores are Icechunk_validate_sync_availability()— validated a YAML field referencing the deleted providers moduleExamples
examples/stac_discover_and_open.pyandexamples/zarr_direct_access.pysimplified: the_spatial_dims()helper and conditional dim-name logic have been removed since all stores normalise spatial dimensions toxandy.Tests
tests/test_ingest_orchestrator.py— network-free, covering resume, cancellation, cursor, idempotencytests/test_ingest_plugins.py— CHIRPS3 and WorldPop plugins (probe, periods, fetch_period, URL construction, period clamping, nodata masking, time encoding)Documentation
extensibility.md:IngestionPluginadded as a first-class extension point with full protocol reference and GridSpec field tableadding_custom_datasets.md: plugin path documented; complete plugin skeleton and YAML template examplebuilt_in_datasets.md: ingest method and sync behaviour descriptions updated for all three built-in pluginsarchitecture.md: dual-path data lifecycle diagram;IngestionPluginin the plugin contract section; updated append execution mode descriptionzarr_and_geozarr.md: Icechunk section added covering MVCC snapshot model, pyramid detection, and HTTP key-servingBreaking changes
ingestion.functionremoved — dataset templates that relied oningestion.functionmust migrate toingestion.plugin. Built-in templates are unaffected.sync.availability.latest_available_functionremoved — theclimate_api.providers.availabilitymodule and all availability function dispatch are gone. Sync end date is now derived fromplugin.periods().build_dataset_zarrremoved — the legacy NetCDF-cache-to-zarr pipeline no longer exists. All ingestion goes throughIngestionPlugin→ Icechunk.prefer_zarrparameter removed — any external callers ofcreate_artifactorsync_datasetthat passedprefer_zarrmust remove the argument.ArtifactFormat.NETCDFremoved — any code referencing this enum value or the/datasets/{id}/downloadendpoint must be updated.Related
probe-spike)