Skip to content

Commit 090ea9d

Browse files
authored
Parquet batch configuration and documentation for spec updates (#2281)
* parquet documentation and batch configuration * single quotes * fix typo
1 parent 47d46d3 commit 090ea9d

2 files changed

Lines changed: 54 additions & 5 deletions

File tree

docs/source/publishing/ogcapi-features.rst

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ To publish a GeoParquet file (with a geometry column) the geopandas package is a
585585
- type: feature
586586
name: Parquet
587587
data:
588-
source: ./tests/data/parquet/random.parquet
588+
source: ./tests/data/parquet/naive/random.parquet
589589
id_field: id
590590
time_field: time
591591
x_field:
@@ -595,11 +595,34 @@ To publish a GeoParquet file (with a geometry column) the geopandas package is a
595595
- minlat
596596
- maxlat
597597
598-
For GeoParquet data, the `x_field` and `y_field` must be specified in the provider definition,
598+
For older versions of parquet data that don't comply to GeoParquet v1.1, the `x_field` and `y_field` must be specified in the provider definition,
599599
and they must be arrays of two column names that contain the x and y coordinates of the
600600
bounding box of each geometry. If the geometries in the data are all points, the `x_field` and `y_field`
601601
can be strings instead of arrays and refer to a single column each.
602602

603+
.. code-block:: yaml
604+
605+
providers:
606+
- type: feature
607+
name: Parquet
608+
id_field: id
609+
data:
610+
source: ./tests/data/parquet/geoparquet1.1/nyc_subset_overture.parquet
611+
batch_size: 10000
612+
batch_readahead: 2
613+
614+
615+
For GeoParquet data which complies to spec version 1.1, all geometry metadata will be automatically
616+
detected.
617+
618+
Note that for any version of parquet, you may optionally specify ``batch_size`` and ``batch_readahead`` in the ``data`` section of the parquet provider config.
619+
``batch_size`` controls how many rows are fetched per batch. Large batch sizes speed up data processing, but add more I/O time like increased latency when fetching data from an object store, and . If not defined it will
620+
default to 20,000 rows.
621+
622+
``batch_readahead`` controls how many batches are buffered in memory. If not specified it will default to 2.
623+
Since OGC API Features payloads are often paginated and fairly small, it generally makes sense to specify a small number to avoid reading too many batches ahead of time, especially when fetching from an object store.
624+
625+
603626
.. _PostgreSQL:
604627

605628
PostgreSQL

pygeoapi/provider/parquet.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ def __init__(self, provider_def):
108108
name: Parquet
109109
data:
110110
source: s3://example.com/parquet_directory/
111-
111+
batch_size: 10000
112+
batch_readahead: 2
112113
id_field: gml_id
113114
114115
@@ -121,6 +122,23 @@ def __init__(self, provider_def):
121122

122123
# Source url is required
123124
self.source = self.data.get('source')
125+
# When iterating over a dataset, the batch size
126+
# controls how many records are read at a time;
127+
# a larger batch size can reduce latency for large/complex
128+
# requests at the cost of more memory usage
129+
# and potentially overfetching;
130+
# More information on batching can be found here:
131+
# https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html#pyarrow.dataset.Dataset.scanner # noqa
132+
# This value can be reduced to decrease network transfer
133+
# if fetching data from an object store
134+
self.batch_size = self.data.get('batch_size', 20_000)
135+
136+
# batch_readahead is the number of batches to prefetch;
137+
# This adds extra memory but can reduce latency for large
138+
# or complicated queries; in an OGC API Features context,
139+
# it generally makes sense to have some buffering but keep it
140+
# low since most responses are small
141+
self.batch_readahead = self.data.get('batch_readahead', 2)
124142
if not self.source:
125143
msg = 'Need explicit "source" attr in data' \
126144
' field of provider config'
@@ -136,7 +154,8 @@ def __init__(self, provider_def):
136154
self.fs = None
137155

138156
# Build pyarrow dataset pointing to the data
139-
self.ds = pyarrow.dataset.dataset(self.source, filesystem=self.fs)
157+
self.ds: pyarrow.dataset.Dataset = \
158+
pyarrow.dataset.dataset(self.source, filesystem=self.fs)
140159

141160
if not self.id_field:
142161
LOGGER.info(
@@ -231,6 +250,11 @@ def _read_parquet(self, return_scanner=False, **kwargs):
231250
:returns: generator of RecordBatch with the queried values
232251
"""
233252
scanner = self.ds.scanner(
253+
batch_size=self.batch_size,
254+
# default batch readahead is 16 which is generally
255+
# far too high in a server context; we can safely set it
256+
# to 2 which allows for queueing without excessive reads
257+
batch_readahead=self.batch_readahead,
234258
use_threads=True,
235259
**kwargs
236260
)
@@ -573,7 +597,9 @@ def _response_feature_hits(self, filter):
573597

574598
try:
575599
scanner = pyarrow.dataset.Scanner.from_dataset(
576-
self.ds, filter=filter
600+
self.ds, filter=filter,
601+
batch_size=self.batch_size,
602+
batch_readahead=self.batch_readahead
577603
)
578604
return {
579605
'type': 'FeatureCollection',

0 commit comments

Comments
 (0)