Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/reference/classes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Columnar table containers, column views, indexes, and CTable schema helpers.

CTable
Column
NestedColumn
Index
NullPolicy

Expand Down
55 changes: 55 additions & 0 deletions doc/reference/ctable.rst
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,54 @@ Null sentinel values are automatically excluded from all aggregates.
.. automethod:: Column.all


----

.. _NestedColumn:

NestedColumn
============

A read-only accessor for a nested (dotted) group of CTable columns, returned by
attribute access on a :class:`CTable` (or on another ``NestedColumn``) when the
name refers to an internal node of the dotted column tree rather than a leaf.

For a table flattened from a ``struct`` / ``list<struct>`` schema (see
:ref:`Nested fields <NestedFields>`), ``t.trip`` is a ``NestedColumn`` grouping
every leaf under the ``trip.`` prefix, while a leaf such as ``t.trip.sec`` or
``t.trip.begin.lon`` is a :class:`Column`. Drilling into an intermediate node
yields another ``NestedColumn``::

t.trip # <NestedColumn 'trip'>
t.trip.col_names # ['sec', 'km', 'begin.lon', 'begin.lat', ...]
t.trip.begin # <NestedColumn 'trip.begin'>
t.trip.begin.lon # Column
print(t.trip.info) # aggregate metadata over the group

Users do not instantiate ``NestedColumn`` directly.

.. autoclass:: NestedColumn

.. rubric:: Attributes

.. autosummary::

NestedColumn.col_names
NestedColumn.nrows
NestedColumn.ncols
NestedColumn.nbytes
NestedColumn.cbytes
NestedColumn.cratio
NestedColumn.info

.. autoproperty:: NestedColumn.col_names
.. autoproperty:: NestedColumn.nrows
.. autoproperty:: NestedColumn.ncols
.. autoproperty:: NestedColumn.nbytes
.. autoproperty:: NestedColumn.cbytes
.. autoproperty:: NestedColumn.cratio
.. autoproperty:: NestedColumn.info


----

.. _SchemaSpecs:
Expand Down Expand Up @@ -760,6 +808,8 @@ to a typed representation. They are not used as an implicit fallback during
Parquet import; unsupported Arrow/Parquet types still raise unless explicitly
imported through :meth:`CTable.from_arrow` with ``object_fallback=True``.

.. _NestedFields:

Nested fields
-------------

Expand Down Expand Up @@ -803,6 +853,11 @@ attribute proxies::
t["trip.begin.lon"].mean() # Column object (fast path)
t.trip.begin.lon.max() # attribute proxy, same column

Accessing an intermediate prefix such as ``t.trip`` or ``t.trip.begin`` returns
a :class:`~blosc2.NestedColumn` that groups all descendant leaves and reports
aggregate metadata via :attr:`~blosc2.NestedColumn.info`; a leaf such as
``t.trip.begin.lon`` returns a :class:`Column`.

A literal ``.``, ``/``, or ``\\`` inside an Arrow field name is escaped with a
backslash in the logical column name. For example, path segments
``("trip.info", "begin/point", "lon.deg")`` become::
Expand Down
4 changes: 4 additions & 0 deletions src/blosc2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ def _raise(exc):
DEFAULT_NULL_POLICY,
Column,
CTable,
NestedColumn,
NullPolicy,
RowTransformer,
get_null_policy,
Expand Down Expand Up @@ -827,8 +828,11 @@ def _raise(exc):
"group_reduce",
# Classes
"C2Array",
"Column",
"CParams",
"CTable",
"CTableGroupBy",
"NestedColumn",
"RowTransformer",
"Batch",
"BatchArray",
Expand Down
4 changes: 2 additions & 2 deletions src/blosc2/b2view/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,8 @@ def object_metadata(obj: Any) -> dict[str, Any]:
return dict(obj.info_items)
except Exception:
return {
"rows": getattr(obj, "nrows", len(obj)),
"columns": getattr(obj, "ncols", len(getattr(obj, "col_names", []))),
"nrows": getattr(obj, "nrows", len(obj)),
"ncols": getattr(obj, "ncols", len(getattr(obj, "col_names", []))),
"schema": {
name: str(getattr(obj[name], "dtype", None)) for name in getattr(obj, "col_names", [])
},
Expand Down
2 changes: 1 addition & 1 deletion src/blosc2/batch_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ def info_items(self) -> list:
("nitems", sum(batch_sizes)),
("nbytes", format_nbytes_info(self.nbytes)),
("cbytes", format_nbytes_info(self.cbytes)),
("cratio", f"{self.cratio:.2f}"),
("cratio", f"{self.cratio:.2f}x"),
("cparams", self.cparams),
("dparams", self.dparams),
]
Expand Down
2 changes: 1 addition & 1 deletion src/blosc2/c2array.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ def info_items(self) -> list:
items += [("dtype", self.dtype)]
items += [("nbytes", format_nbytes_info(self.nbytes))]
items += [("cbytes", format_nbytes_info(self.cbytes))]
items += [("cratio", f"{self.cratio:.2f}")]
items += [("cratio", f"{self.cratio:.2f}x")]
items += [("cparams", self.cparams)]
# items += [("dparams", self.dparams)]
return items
Expand Down
47 changes: 43 additions & 4 deletions src/blosc2/cli/parquet_to_blosc2.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@
DEFAULT_BATCH_SIZE = 2048
MAX_ELEMENT_WRITE_BATCH = 5_000_000 # cap on flattened elements yielded per write
UNNAMED_ROOT_CAPACITY_SAFETY = 1.15 # first-batch estimates are often a little low
# Target in-memory size of one Arrow read batch for the unnamed-root flatten
# path. Nested list<struct> batches amplify ~10x downstream (flatten + cast +
# write buffers + Arrow pool), so an auto parquet batch size is capped to keep
# this Arrow batch small enough that peak RSS stays well under ~1 GB.
PARQUET_BATCH_ARROW_BUDGET = 48 * 2**20 # 48 MiB


def require_pyarrow():
Expand Down Expand Up @@ -242,6 +247,16 @@ def build_parser() -> argparse.ArgumentParser:
)
parser.add_argument("--codec", type=str, default="ZSTD", choices=[c.name for c in blosc2.Codec])
parser.add_argument("--clevel", type=int, default=5)
parser.add_argument(
"--reduce-mem",
action="store_true",
help=(
"Shrink an auto-chosen Parquet batch size so a single Arrow read batch fits a "
"small memory budget, lowering peak RSS at the cost of import speed. "
"Only affects auto batch sizing for unnamed-root list<struct<...>> flattening; "
"an explicit --batch-size is always left untouched."
),
)
parser.add_argument(
"--mem-report",
action="store_true",
Expand Down Expand Up @@ -1018,6 +1033,27 @@ def _flatten_root_batches_with_progress(
break


def _apply_parquet_batch_memory_budget(args, sample, n_outer_sampled: int) -> None:
"""Shrink an auto parquet batch size so one Arrow read batch fits the budget.

Nested list<struct> batches amplify several-fold downstream (flatten + cast
+ write buffers + Arrow pool), so an auto-chosen parquet batch size is capped
to keep peak RSS well under ~1 GB. An explicit --parquet-batch-size is left
untouched.

Opt-in via --reduce-mem: it trades import speed for lower peak RSS, so the
default keeps the original (larger) auto batch sizes.
"""
if not getattr(args, "reduce_mem", False):
return
if not getattr(args, "parquet_batch_size_auto", False):
return
bytes_per_outer = sample.nbytes / n_outer_sampled
if bytes_per_outer > 0:
budget_rows = max(1, int(PARQUET_BATCH_ARROW_BUDGET / bytes_per_outer))
args.parquet_batch_size = min(args.parquet_batch_size, budget_rows)


def import_unnamed_root_separate_cols(
args,
input_path: Path,
Expand Down Expand Up @@ -1051,14 +1087,16 @@ def import_unnamed_root_separate_cols(
estimated_batch_rows = None
if total_parquet_rows is not None and total_parquet_rows > 0:
try:
sample = next(
pf.iter_batches(batch_size=min(args.parquet_batch_size, total_parquet_rows)),
None,
)
# Sample only a few outer rows: enough for the per-outer-row ratio
# and byte estimate, while avoiding a large transient Arrow batch
# (which the Arrow pool would retain and inflate peak RSS).
sample_rows = min(args.parquet_batch_size, total_parquet_rows, 64)
sample = next(pf.iter_batches(batch_size=sample_rows), None)
if sample is not None and len(sample) > 0:
n_outer_sampled = len(sample)
n_elems_sampled = len(sample.column(0).flatten())
avg_per_outer_row = n_elems_sampled / n_outer_sampled
_apply_parquet_batch_memory_budget(args, sample, n_outer_sampled)
estimated_batch_rows = max(1, round(args.parquet_batch_size * avg_per_outer_row))
estimate = round(total_parquet_rows * avg_per_outer_row)
if args.max_rows is None:
Expand Down Expand Up @@ -1515,6 +1553,7 @@ def resolve_default_batch_sizes(args, *, parquet_specified: bool, blosc2_specifi
# Parquet batches are outer rows, while Blosc2 batches are flattened
# CTable rows. Keep them independent so a large write batch does not
# accidentally imply a huge Parquet read batch (and vice versa).
args.parquet_batch_size_auto = not parquet_specified
if not parquet_specified:
args.parquet_batch_size = average_parquet_row_group_size(args.input_path) or DEFAULT_BATCH_SIZE
if not blosc2_specified:
Expand Down
Loading
Loading