diff --git a/doc/reference/classes.rst b/doc/reference/classes.rst index a0500f4f..ff211f5e 100644 --- a/doc/reference/classes.rst +++ b/doc/reference/classes.rst @@ -55,6 +55,7 @@ Columnar table containers, column views, indexes, and CTable schema helpers. CTable Column + NestedColumn Index NullPolicy diff --git a/doc/reference/ctable.rst b/doc/reference/ctable.rst index 84b0c270..3dbbcda3 100644 --- a/doc/reference/ctable.rst +++ b/doc/reference/ctable.rst @@ -615,6 +615,54 @@ Null sentinel values are automatically excluded from all aggregates. .. automethod:: Column.all +---- + +.. _NestedColumn: + +NestedColumn +============ + +A read-only accessor for a nested (dotted) group of CTable columns, returned by +attribute access on a :class:`CTable` (or on another ``NestedColumn``) when the +name refers to an internal node of the dotted column tree rather than a leaf. + +For a table flattened from a ``struct`` / ``list`` schema (see +:ref:`Nested fields `), ``t.trip`` is a ``NestedColumn`` grouping +every leaf under the ``trip.`` prefix, while a leaf such as ``t.trip.sec`` or +``t.trip.begin.lon`` is a :class:`Column`. Drilling into an intermediate node +yields another ``NestedColumn``:: + + t.trip # + t.trip.col_names # ['sec', 'km', 'begin.lon', 'begin.lat', ...] + t.trip.begin # + t.trip.begin.lon # Column + print(t.trip.info) # aggregate metadata over the group + +Users do not instantiate ``NestedColumn`` directly. + +.. autoclass:: NestedColumn + +.. rubric:: Attributes + +.. autosummary:: + + NestedColumn.col_names + NestedColumn.nrows + NestedColumn.ncols + NestedColumn.nbytes + NestedColumn.cbytes + NestedColumn.cratio + NestedColumn.info + +.. autoproperty:: NestedColumn.col_names +.. autoproperty:: NestedColumn.nrows +.. autoproperty:: NestedColumn.ncols +.. autoproperty:: NestedColumn.nbytes +.. autoproperty:: NestedColumn.cbytes +.. autoproperty:: NestedColumn.cratio +.. autoproperty:: NestedColumn.info + + ---- .. _SchemaSpecs: @@ -760,6 +808,8 @@ to a typed representation. They are not used as an implicit fallback during Parquet import; unsupported Arrow/Parquet types still raise unless explicitly imported through :meth:`CTable.from_arrow` with ``object_fallback=True``. +.. _NestedFields: + Nested fields ------------- @@ -803,6 +853,11 @@ attribute proxies:: t["trip.begin.lon"].mean() # Column object (fast path) t.trip.begin.lon.max() # attribute proxy, same column +Accessing an intermediate prefix such as ``t.trip`` or ``t.trip.begin`` returns +a :class:`~blosc2.NestedColumn` that groups all descendant leaves and reports +aggregate metadata via :attr:`~blosc2.NestedColumn.info`; a leaf such as +``t.trip.begin.lon`` returns a :class:`Column`. + A literal ``.``, ``/``, or ``\\`` inside an Arrow field name is escaped with a backslash in the logical column name. For example, path segments ``("trip.info", "begin/point", "lon.deg")`` become:: diff --git a/src/blosc2/__init__.py b/src/blosc2/__init__.py index 6013967b..4bcec2d0 100644 --- a/src/blosc2/__init__.py +++ b/src/blosc2/__init__.py @@ -632,6 +632,7 @@ def _raise(exc): DEFAULT_NULL_POLICY, Column, CTable, + NestedColumn, NullPolicy, RowTransformer, get_null_policy, @@ -827,8 +828,11 @@ def _raise(exc): "group_reduce", # Classes "C2Array", + "Column", "CParams", + "CTable", "CTableGroupBy", + "NestedColumn", "RowTransformer", "Batch", "BatchArray", diff --git a/src/blosc2/b2view/model.py b/src/blosc2/b2view/model.py index 3732aeb4..8fc5f630 100644 --- a/src/blosc2/b2view/model.py +++ b/src/blosc2/b2view/model.py @@ -341,8 +341,8 @@ def object_metadata(obj: Any) -> dict[str, Any]: return dict(obj.info_items) except Exception: return { - "rows": getattr(obj, "nrows", len(obj)), - "columns": getattr(obj, "ncols", len(getattr(obj, "col_names", []))), + "nrows": getattr(obj, "nrows", len(obj)), + "ncols": getattr(obj, "ncols", len(getattr(obj, "col_names", []))), "schema": { name: str(getattr(obj[name], "dtype", None)) for name in getattr(obj, "col_names", []) }, diff --git a/src/blosc2/batch_array.py b/src/blosc2/batch_array.py index 72c435a9..ef3b8fbb 100644 --- a/src/blosc2/batch_array.py +++ b/src/blosc2/batch_array.py @@ -948,7 +948,7 @@ def info_items(self) -> list: ("nitems", sum(batch_sizes)), ("nbytes", format_nbytes_info(self.nbytes)), ("cbytes", format_nbytes_info(self.cbytes)), - ("cratio", f"{self.cratio:.2f}"), + ("cratio", f"{self.cratio:.2f}x"), ("cparams", self.cparams), ("dparams", self.dparams), ] diff --git a/src/blosc2/c2array.py b/src/blosc2/c2array.py index 4f2a3bda..d785bf4d 100644 --- a/src/blosc2/c2array.py +++ b/src/blosc2/c2array.py @@ -477,7 +477,7 @@ def info_items(self) -> list: items += [("dtype", self.dtype)] items += [("nbytes", format_nbytes_info(self.nbytes))] items += [("cbytes", format_nbytes_info(self.cbytes))] - items += [("cratio", f"{self.cratio:.2f}")] + items += [("cratio", f"{self.cratio:.2f}x")] items += [("cparams", self.cparams)] # items += [("dparams", self.dparams)] return items diff --git a/src/blosc2/cli/parquet_to_blosc2.py b/src/blosc2/cli/parquet_to_blosc2.py index 3f6520bf..d4da3018 100644 --- a/src/blosc2/cli/parquet_to_blosc2.py +++ b/src/blosc2/cli/parquet_to_blosc2.py @@ -49,6 +49,11 @@ DEFAULT_BATCH_SIZE = 2048 MAX_ELEMENT_WRITE_BATCH = 5_000_000 # cap on flattened elements yielded per write UNNAMED_ROOT_CAPACITY_SAFETY = 1.15 # first-batch estimates are often a little low +# Target in-memory size of one Arrow read batch for the unnamed-root flatten +# path. Nested list batches amplify ~10x downstream (flatten + cast + +# write buffers + Arrow pool), so an auto parquet batch size is capped to keep +# this Arrow batch small enough that peak RSS stays well under ~1 GB. +PARQUET_BATCH_ARROW_BUDGET = 48 * 2**20 # 48 MiB def require_pyarrow(): @@ -242,6 +247,16 @@ def build_parser() -> argparse.ArgumentParser: ) parser.add_argument("--codec", type=str, default="ZSTD", choices=[c.name for c in blosc2.Codec]) parser.add_argument("--clevel", type=int, default=5) + parser.add_argument( + "--reduce-mem", + action="store_true", + help=( + "Shrink an auto-chosen Parquet batch size so a single Arrow read batch fits a " + "small memory budget, lowering peak RSS at the cost of import speed. " + "Only affects auto batch sizing for unnamed-root list> flattening; " + "an explicit --batch-size is always left untouched." + ), + ) parser.add_argument( "--mem-report", action="store_true", @@ -1018,6 +1033,27 @@ def _flatten_root_batches_with_progress( break +def _apply_parquet_batch_memory_budget(args, sample, n_outer_sampled: int) -> None: + """Shrink an auto parquet batch size so one Arrow read batch fits the budget. + + Nested list batches amplify several-fold downstream (flatten + cast + + write buffers + Arrow pool), so an auto-chosen parquet batch size is capped + to keep peak RSS well under ~1 GB. An explicit --parquet-batch-size is left + untouched. + + Opt-in via --reduce-mem: it trades import speed for lower peak RSS, so the + default keeps the original (larger) auto batch sizes. + """ + if not getattr(args, "reduce_mem", False): + return + if not getattr(args, "parquet_batch_size_auto", False): + return + bytes_per_outer = sample.nbytes / n_outer_sampled + if bytes_per_outer > 0: + budget_rows = max(1, int(PARQUET_BATCH_ARROW_BUDGET / bytes_per_outer)) + args.parquet_batch_size = min(args.parquet_batch_size, budget_rows) + + def import_unnamed_root_separate_cols( args, input_path: Path, @@ -1051,14 +1087,16 @@ def import_unnamed_root_separate_cols( estimated_batch_rows = None if total_parquet_rows is not None and total_parquet_rows > 0: try: - sample = next( - pf.iter_batches(batch_size=min(args.parquet_batch_size, total_parquet_rows)), - None, - ) + # Sample only a few outer rows: enough for the per-outer-row ratio + # and byte estimate, while avoiding a large transient Arrow batch + # (which the Arrow pool would retain and inflate peak RSS). + sample_rows = min(args.parquet_batch_size, total_parquet_rows, 64) + sample = next(pf.iter_batches(batch_size=sample_rows), None) if sample is not None and len(sample) > 0: n_outer_sampled = len(sample) n_elems_sampled = len(sample.column(0).flatten()) avg_per_outer_row = n_elems_sampled / n_outer_sampled + _apply_parquet_batch_memory_budget(args, sample, n_outer_sampled) estimated_batch_rows = max(1, round(args.parquet_batch_size * avg_per_outer_row)) estimate = round(total_parquet_rows * avg_per_outer_row) if args.max_rows is None: @@ -1515,6 +1553,7 @@ def resolve_default_batch_sizes(args, *, parquet_specified: bool, blosc2_specifi # Parquet batches are outer rows, while Blosc2 batches are flattened # CTable rows. Keep them independent so a large write batch does not # accidentally imply a huge Parquet read batch (and vice versa). + args.parquet_batch_size_auto = not parquet_specified if not parquet_specified: args.parquet_batch_size = average_parquet_row_group_size(args.input_path) or DEFAULT_BATCH_SIZE if not blosc2_specified: diff --git a/src/blosc2/ctable.py b/src/blosc2/ctable.py index 8f088307..a156bd13 100644 --- a/src/blosc2/ctable.py +++ b/src/blosc2/ctable.py @@ -1207,33 +1207,41 @@ def info_items(self) -> list[tuple[str, object]]: spec = col_meta.spec if col_meta is not None else None chunks = getattr(raw, "chunks", None) blocks = getattr(raw, "blocks", None) + + if self.is_list: + backend = "list" + elif self.is_varlen_scalar: + backend = "variable-length scalar" + elif self.is_dictionary: + backend = "dictionary" + else: + backend = "NDArray" if isinstance(raw, blosc2.NDArray) else type(raw).__name__ + + # Virtual computed columns are not stored; otherwise report the table's + # storage kind, mirroring CTable.info's persistent/in-memory wording. + if self.is_computed: + storage = "computed" + elif isinstance(table._storage, FileTableStorage): + storage = "persistent" + else: + storage = "in-memory" + + # Block order mirrors CTable.info: identity, shape/grid, sizes, content, + # then compression params. items: list[tuple[str, object]] = [ ("type", self.__class__.__name__), ("name", self._col_name), - ("nrows", len(self)), - ("shape", self.shape), + ("dtype", table._dtype_info_label(self.dtype, spec)), + ("backend", backend), + ("storage", storage), ] + + items.append(("nrows", len(self))) + items.append(("shape", self.shape)) if chunks is not None: items.append(("chunks", chunks)) if blocks is not None: items.append(("blocks", blocks)) - items.extend( - [ - ("dtype", table._dtype_info_label(self.dtype, spec)), - ("computed", self.is_computed), - ("nullable", self.null_value is not None or getattr(spec, "nullable", False)), - ] - ) - - if self.is_list: - items.append(("storage", "list")) - elif self.is_varlen_scalar: - items.append(("storage", "variable-length scalar")) - elif self.is_dictionary: - items.append(("storage", "dictionary")) - items.append(("dictionary_size", len(raw.dictionary))) - else: - items.append(("storage", "ndarray" if isinstance(raw, blosc2.NDArray) else type(raw).__name__)) nbytes = getattr(raw, "nbytes", None) cbytes = getattr(raw, "cbytes", None) @@ -1243,11 +1251,12 @@ def info_items(self) -> list[tuple[str, object]]: if cbytes is not None: items.append(("cbytes", format_nbytes_info(cbytes))) if cratio is not None: - items.append(("cratio", f"{cratio:.2f}")) + items.append(("cratio", f"{cratio:.2f}x")) + + items.append(("nullable", self.null_value is not None or getattr(spec, "nullable", False))) + if self.is_dictionary: + items.append(("dictionary_size", len(raw.dictionary))) - urlpath = getattr(raw, "urlpath", None) - if urlpath is not None: - items.append(("urlpath", urlpath)) cparams = getattr(raw, "cparams", None) dparams = getattr(raw, "dparams", None) if cparams is not None: @@ -2477,11 +2486,29 @@ def __iter__(self): yield self._row_value_at_logical(i) -class _NestedColumnNamespace: - """Attribute proxy for dotted nested column paths. +class NestedColumn: + """A read-only accessor for a nested (dotted) group of CTable columns. + + Returned by attribute access on a :class:`CTable` (or on another + ``NestedColumn``) when the name refers to an internal node of the dotted + column tree rather than a leaf. For a table flattened from a + ``struct``/``list`` schema, ``t.trip`` is a ``NestedColumn`` + grouping every leaf under the ``trip.`` prefix, while a leaf such as + ``t.trip.sec`` (or ``t.trip.begin.lon``) is a :class:`Column`. Drilling + into an intermediate node (e.g. ``t.trip.begin``) yields another + ``NestedColumn``. + + Exposes aggregate metadata over its descendant leaf columns + (:attr:`col_names`, :attr:`nrows`, :attr:`ncols`, :attr:`nbytes`, + :attr:`cbytes`, :attr:`cratio`) and an :attr:`info` report. - Allows `t.trip.begin.lon` when the physical leaf column is named - `"trip.begin.lon"`. + Examples + -------- + >>> t.trip # doctest: +SKIP + + >>> t.trip.col_names # doctest: +SKIP + ['sec', 'km', 'begin.lon', ...] + >>> t.trip.sec # a leaf -> Column # doctest: +SKIP """ def __init__(self, table: CTable, prefix: str): @@ -2568,7 +2595,7 @@ def info_items(self) -> list[tuple[str, object]]: ("nrows", self.nrows), ("nbytes", format_nbytes_info(self.nbytes)), ("cbytes", format_nbytes_info(self.cbytes)), - ("cratio", f"{self.cratio:.1f}x"), + ("cratio", f"{self.cratio:.2f}x"), ("schema", schema_summary), ] @@ -2591,11 +2618,11 @@ def __getattr__(self, name: str): for col_name in self._table.col_names: parts = split_field_path(col_name) if parts[: len(path_parts)] == path_parts and len(parts) > len(path_parts): - return _NestedColumnNamespace(self._table, path) + return NestedColumn(self._table, path) raise AttributeError(path) def __repr__(self) -> str: - return f"" + return f"" class _LazyColumnDict(dict): @@ -2671,6 +2698,66 @@ def __delitem__(self, name: str) -> None: dict.__delitem__(self, name) +class _ChunkAlignedWriter: + """Buffer writes to a fixed-size NDArray and flush them chunk-aligned. + + During Arrow/Parquet import the incoming batches have variable, non + chunk-aligned sizes, so writing each one straight to ``arr[pos:pos+m]`` + makes most writes straddle chunk boundaries — forcing a + decompress-merge-recompress of partially filled chunks. This buffer + accumulates appended arrays and writes them out in exact ``chunk_len`` + blocks aligned to chunk boundaries, so every chunk is compressed once. + Only the final flush may write a partial (sub-chunk) tail. + """ + + __slots__ = ("arr", "chunk_len", "pending", "pending_n", "wpos") + + def __init__(self, arr, chunk_len: int) -> None: + self.arr = arr + self.chunk_len = chunk_len + self.pending: list[np.ndarray] = [] + self.pending_n = 0 + self.wpos = 0 + + def append(self, block: np.ndarray) -> None: + if len(block) == 0: + return + self.pending.append(block) + self.pending_n += len(block) + while self.pending_n >= self.chunk_len: + self._write(self._take(self.chunk_len)) + + def flush(self) -> None: + if self.pending_n: + self._write(self._take(self.pending_n)) + + def _write(self, block: np.ndarray) -> None: + n = len(block) + self.arr[self.wpos : self.wpos + n] = block + self.wpos += n + + def _take(self, n: int) -> np.ndarray: + """Pull exactly *n* rows from the front of the pending queue.""" + # Fast path: the head array already holds exactly the requested rows. + if len(self.pending[0]) == n: + self.pending_n -= n + return self.pending.pop(0) + parts: list[np.ndarray] = [] + need = n + while need > 0: + head = self.pending[0] + if len(head) <= need: + parts.append(head) + need -= len(head) + self.pending.pop(0) + else: + parts.append(head[:need]) + self.pending[0] = head[need:] + need = 0 + self.pending_n -= n + return parts[0] if len(parts) == 1 else np.concatenate(parts) + + class CTable(_CTableIndexingMixin, Generic[RowT]): """Columnar compressed table with typed columns and row-oriented access.""" @@ -2828,12 +2915,26 @@ def __init__( self._last_pos = 0 default_chunks, default_blocks = compute_chunks_blocks((expected_size,)) + # Compute the table-wide shared grid once so both the _valid_rows + # mask and the fixed-size columns use it; this keeps where() on the + # fast_eval path (the boolean mask is combined with the condition). + shared_chunks, shared_blocks, aligned_names = self._compute_aligned_grid( + self._schema.columns, expected_size + ) + valid_chunks = shared_chunks if shared_chunks is not None else default_chunks + valid_blocks = shared_blocks if shared_blocks is not None else default_blocks self._valid_rows = storage.create_valid_rows( shape=(expected_size,), - chunks=default_chunks, - blocks=default_blocks, + chunks=valid_chunks, + blocks=valid_blocks, + ) + self._init_columns( + expected_size, + default_chunks, + default_blocks, + storage, + aligned_grid=(shared_chunks, shared_blocks, aligned_names), ) - self._init_columns(expected_size, default_chunks, default_blocks, storage) storage.save_schema(schema_to_dict(self._schema)) if new_data is not None: @@ -3101,10 +3202,118 @@ def _flush_varlen_columns(self) -> None: ): self._cols[col.name].flush() + # Common itemsizes we snap the representative (median) itemsize to when + # computing the table-wide shared chunk/block grid. + _COMMON_ITEMSIZES = (1, 2, 4, 8, 16) + + # Fixed-width string/bytes columns up to this many bytes join the shared + # grid (so string filters stay on the fast path); ``U32`` is 128 bytes + # under NumPy's 4-bytes-per-char Unicode dtype. Larger string columns are + # better stored as dictionary columns. + _MAX_ALIGNED_STR_ITEMSIZE = 128 + + @staticmethod + def _snap_itemsize(median: float) -> int: + """Snap *median* to the nearest value in :attr:`_COMMON_ITEMSIZES`. + + Ties round down (the strict ``<`` comparison keeps the first, smaller + candidate), so a median of 6 snaps to 4 rather than 8. + """ + best = CTable._COMMON_ITEMSIZES[0] + best_dist = abs(median - best) + for value in CTable._COMMON_ITEMSIZES[1:]: + dist = abs(median - value) + if dist < best_dist: + best, best_dist = value, dist + return best + + @classmethod + def _compute_aligned_grid(cls, columns, capacity: int): + """Compute a single chunk/block grid shared by fixed-size columns. + + All 1-D fixed-size scalar columns (no user-pinned grid) are sized to one + common ``(chunks, blocks)`` so that lazy expressions over them take the + ``fast_eval`` path (which requires identical element-unit chunk/block + grids across operands). The same grid is also applied to the + ``_valid_rows`` mask so that ``where()`` keeps the fast path when it + combines the condition with the mask. + + The grid is sized for the *median* itemsize of the eligible numeric + columns (the operands that dominate fused arithmetic), snapped to the + nearest common itemsize. Numeric columns join the aligned set only if + the shared grid does not blow their chunk size past ~4x what they would + pick on their own; this keeps wide numeric columns on per-dtype sizing. + + Fixed-width string/bytes columns are kept *out* of the median (so they + don't coarsen the numeric grid) but still join the aligned set when + their itemsize is at most :attr:`_MAX_ALIGNED_STR_ITEMSIZE`, so string + filters stay on the fast path. Wider string columns (e.g. ``U183642``) + keep per-dtype sizing instead of producing multi-GB chunks. + + ``columns`` is an iterable of :class:`CompiledColumn`. Returns a + ``(shared_chunks, shared_blocks, included_names)`` tuple, where + ``included_names`` is the set of column names that should use the shared + grid. Returns ``(None, None, set())`` when there is nothing to align. + """ + numeric, strings = [], [] + for col in columns: + if ( + cls._is_list_column(col) + or cls._is_varlen_scalar_column(col) + or cls._is_dictionary_column(col) + ): + continue + if col.dtype is None: + continue + if col.config.chunks is not None or col.config.blocks is not None: + continue + if len(cls._column_physical_shape(col, capacity)) != 1: + continue + if np.dtype(col.dtype).kind in ("U", "S"): + strings.append(col) + else: + numeric.append(col) + + if not numeric and not strings: + return None, None, set() + + # Size the grid from the numeric columns; if the table is all strings, + # fall back to sizing it from the string columns instead. + basis = numeric or strings + itemsizes = [np.dtype(col.dtype).itemsize for col in basis] + snapped = cls._snap_itemsize(float(np.median(itemsizes))) + shared_chunks, shared_blocks = compute_chunks_blocks((capacity,), dtype=np.dtype(f"V{snapped}")) + + included = set() + for col in numeric: + natural_chunks, _ = cls._column_chunks_blocks(col, cls._column_physical_shape(col, capacity)) + # Only align numeric columns whose shared-grid chunk stays within + # ~4x the chunk they would choose on their own (in rows; itemsize + # cancels). + if shared_chunks[0] <= 4 * natural_chunks[0]: + included.add(col.name) + for col in strings: + # Fixed strings join via an absolute byte ceiling, not the relative + # cap: they fast-path equality filters and a few-MB block is fine. + if np.dtype(col.dtype).itemsize <= cls._MAX_ALIGNED_STR_ITEMSIZE: + included.add(col.name) + + if not included: + return None, None, set() + return shared_chunks, shared_blocks, included + def _init_columns( - self, expected_size: int, default_chunks, default_blocks, storage: TableStorage + self, + expected_size: int, + default_chunks, + default_blocks, + storage: TableStorage, + aligned_grid: tuple | None = None, ) -> None: """Create one physical column per compiled schema column.""" + if aligned_grid is None: + aligned_grid = self._compute_aligned_grid(self._schema.columns, expected_size) + shared_chunks, shared_blocks, aligned_names = aligned_grid for col in self._schema.columns: self.col_names.append(col.name) self._col_widths[col.name] = max(len(col.name), col.display_width) @@ -3142,7 +3351,12 @@ def _init_columns( blocks = col_storage["blocks"] shape = self._column_physical_shape(col, expected_size) if col.config.chunks is None and col.config.blocks is None: - chunks, blocks = self._column_chunks_blocks(col, shape) + if col.name in aligned_names: + # Use the table-wide shared grid so lazy expressions over + # this column take the fast_eval path. + chunks, blocks = shared_chunks, shared_blocks + else: + chunks, blocks = self._column_chunks_blocks(col, shape) self._cols[col.name] = storage.create_column( col.name, dtype=col.dtype, @@ -4077,12 +4291,18 @@ def _save_to_storage(self, storage: TableStorage) -> None: capacity = max(n_live, 1) default_chunks, default_blocks = compute_chunks_blocks((capacity,)) + # Align fixed-size scalar columns (and the _valid_rows mask) on one + # shared grid so lazy expressions over the saved table take the + # fast_eval path on reopen. + shared_chunks, shared_blocks, aligned_names = self._compute_aligned_grid( + self._schema.columns, capacity + ) # --- valid_rows (all True, compacted) --- disk_valid = storage.create_valid_rows( shape=(capacity,), - chunks=default_chunks, - blocks=default_blocks, + chunks=shared_chunks if shared_chunks is not None else default_chunks, + blocks=shared_blocks if shared_blocks is not None else default_blocks, ) if n_live > 0: disk_valid[:n_live] = True @@ -4130,7 +4350,10 @@ def _save_to_storage(self, storage: TableStorage) -> None: disk_dc.codes[:n_live] = raw_codes continue shape = self._column_physical_shape(col, capacity) - dtype_chunks, dtype_blocks = self._column_chunks_blocks(col, shape) + if col.name in aligned_names: + dtype_chunks, dtype_blocks = shared_chunks, shared_blocks + else: + dtype_chunks, dtype_blocks = self._column_chunks_blocks(col, shape) col_storage = self._resolve_column_storage(col, dtype_chunks, dtype_blocks) disk_col = storage.create_column( name, @@ -4264,7 +4487,7 @@ def _open_from_treestore(cls, store: blosc2.TreeStore, full_key: str) -> CTable: return cls._open_from_storage(storage) @classmethod - def load(cls, urlpath: str) -> CTable: + def load(cls, urlpath: str) -> CTable: # noqa: C901 """Load a persistent table from *urlpath* into RAM. The schema is read from the table's metadata — the original Python @@ -4308,11 +4531,14 @@ def load(cls, urlpath: str) -> CTable: mem_storage = InMemoryTableStorage() bool_chunks, bool_blocks = compute_chunks_blocks((capacity,), dtype=np.dtype(np.bool_)) + # Align fixed-size scalar columns (and the _valid_rows mask) on one + # shared grid so lazy expressions over them take the fast_eval path. + shared_chunks, shared_blocks, aligned_names = cls._compute_aligned_grid(schema.columns, capacity) mem_valid = mem_storage.create_valid_rows( shape=(capacity,), - chunks=bool_chunks, - blocks=bool_blocks, + chunks=shared_chunks if shared_chunks is not None else bool_chunks, + blocks=shared_blocks if shared_blocks is not None else bool_blocks, ) if phys_size > 0: mem_valid[:phys_size] = disk_valid[:] @@ -4344,7 +4570,10 @@ def load(cls, urlpath: str) -> CTable: mem_cols[name] = mem_col continue shape = cls._column_physical_shape(col, capacity) - col_chunks, col_blocks = cls._column_chunks_blocks(col, shape) + if name in aligned_names: + col_chunks, col_blocks = shared_chunks, shared_blocks + else: + col_chunks, col_blocks = cls._column_chunks_blocks(col, shape) mem_col = mem_storage.create_column( name, dtype=col.dtype, @@ -5483,9 +5712,12 @@ def _create_arrow_import_columns( cls, storage: TableStorage, columns: list[CompiledColumn], capacity: int, cparams, dparams ): default_chunks, default_blocks = compute_chunks_blocks((capacity,)) - new_valid = storage.create_valid_rows( - shape=(capacity,), chunks=default_chunks, blocks=default_blocks - ) + # Align fixed-size scalar columns (and the _valid_rows mask) on one + # shared grid so lazy expressions over them take the fast_eval path. + shared_chunks, shared_blocks, aligned_names = cls._compute_aligned_grid(columns, capacity) + valid_chunks = shared_chunks if shared_chunks is not None else default_chunks + valid_blocks = shared_blocks if shared_blocks is not None else default_blocks + new_valid = storage.create_valid_rows(shape=(capacity,), chunks=valid_chunks, blocks=valid_blocks) new_cols: dict[str, blosc2.NDArray | ListArray | _ScalarVarLenArray | DictionaryColumn] = {} for col in columns: if cls._is_list_column(col): @@ -5497,16 +5729,29 @@ def _create_arrow_import_columns( col.name, spec=col.spec, cparams=cparams, dparams=dparams ) elif cls._is_dictionary_column(col): - dict_col = storage.create_dictionary_column( - col.name, spec=col.spec, cparams=cparams, dparams=dparams + # Create the int32 codes array at full capacity with the aligned + # grid (codes are int32, so they match the shared numeric grid). + # This avoids a create-then-resize and the catastrophic 4096-row + # default chunking, and lets dict-column filters use the fast path. + if shared_chunks is not None: + codes_chunks, codes_blocks = shared_chunks, shared_blocks + else: + codes_chunks, codes_blocks = compute_chunks_blocks((capacity,), dtype=np.dtype(np.int32)) + new_cols[col.name] = storage.create_dictionary_column( + col.name, + spec=col.spec, + cparams=cparams, + dparams=dparams, + codes_shape=(capacity,), + codes_chunks=codes_chunks, + codes_blocks=codes_blocks, ) - if len(dict_col.codes) < capacity: - dict_col.resize((capacity,)) - new_cols[col.name] = dict_col else: shape = cls._column_physical_shape(col, capacity) chunks, blocks = default_chunks, default_blocks - if col.dtype is not None: + if col.name in aligned_names: + chunks, blocks = shared_chunks, shared_blocks + elif col.dtype is not None: chunks, blocks = cls._column_chunks_blocks(col, shape) new_cols[col.name] = storage.create_column( col.name, @@ -5615,12 +5860,28 @@ def _write_arrow_batches(cls, obj, batches, columns, new_cols, new_valid) -> Non for col in columns if cls._is_list_column(col) } + # Buffer fixed-size column writes and flush them chunk-aligned so each + # chunk is compressed once instead of being merged on every batch. + writers = { + col.name: _ChunkAlignedWriter(new_cols[col.name], new_cols[col.name].chunks[0]) + for col in columns + if not ( + cls._is_list_column(col) + or cls._is_varlen_scalar_column(col) + or cls._is_dictionary_column(col) + ) + } for batch in batches: end = pos + len(batch) while end > len(new_valid): obj._grow() new_valid = obj._valid_rows - pos = cls._write_arrow_batch(batch, columns, new_cols, new_valid, pos, list_normalizers) + pos = cls._write_arrow_batch(batch, columns, new_cols, new_valid, pos, list_normalizers, writers) + for writer in writers.values(): + writer.flush() + # All imported rows are valid; mark them in a single aligned write. + if pos: + new_valid[:pos] = True for col in columns: if ( cls._is_list_column(col) @@ -5633,7 +5894,9 @@ def _write_arrow_batches(cls, obj, batches, columns, new_cols, new_valid) -> Non obj._last_pos = pos @classmethod - def _write_arrow_batch(cls, batch, columns, new_cols, new_valid, pos: int, list_normalizers) -> int: + def _write_arrow_batch( + cls, batch, columns, new_cols, new_valid, pos: int, list_normalizers, writers + ) -> int: m = len(batch) if m == 0: return pos @@ -5663,8 +5926,7 @@ def _write_arrow_batch(cls, batch, columns, new_cols, new_valid, pos: int, list_ # Plain string array: encode values into the dictionary. new_cols[col.name][pos : pos + m] = arrow_col.to_pylist() else: - new_cols[col.name][pos : pos + m] = cls._arrow_column_to_numpy(arrow_col, col) - new_valid[pos : pos + m] = True + writers[col.name].append(cls._arrow_column_to_numpy(arrow_col, col)) return pos + m @staticmethod @@ -6575,17 +6837,23 @@ def from_csv( n = len(col_data[0]) if ncols > 0 else 0 capacity = max(n, 1) default_chunks, default_blocks = compute_chunks_blocks((capacity,)) + # Align fixed-size scalar columns (and the _valid_rows mask) on one + # shared grid so lazy expressions over them take the fast_eval path. + shared_chunks, shared_blocks, aligned_names = cls._compute_aligned_grid(schema.columns, capacity) mem_storage = InMemoryTableStorage() new_valid = mem_storage.create_valid_rows( shape=(capacity,), - chunks=default_chunks, - blocks=default_blocks, + chunks=shared_chunks if shared_chunks is not None else default_chunks, + blocks=shared_blocks if shared_blocks is not None else default_blocks, ) new_cols: dict[str, blosc2.NDArray] = {} for col in schema.columns: shape = cls._column_physical_shape(col, capacity) - chunks, blocks = cls._column_chunks_blocks(col, shape) + if col.name in aligned_names: + chunks, blocks = shared_chunks, shared_blocks + else: + chunks, blocks = cls._column_chunks_blocks(col, shape) new_cols[col.name] = mem_storage.create_column( col.name, dtype=col.dtype, @@ -6720,12 +6988,15 @@ def from_pandas(cls, df, row_cls) -> CTable: # noqa: C901 n = len(df) capacity = max(n, 1) default_chunks, default_blocks = compute_chunks_blocks((capacity,)) + # Align fixed-size scalar columns (and the _valid_rows mask) on one + # shared grid so lazy expressions over them take the fast_eval path. + shared_chunks, shared_blocks, aligned_names = cls._compute_aligned_grid(schema.columns, capacity) mem_storage = InMemoryTableStorage() new_valid = mem_storage.create_valid_rows( shape=(capacity,), - chunks=default_chunks, - blocks=default_blocks, + chunks=shared_chunks if shared_chunks is not None else default_chunks, + blocks=shared_blocks if shared_blocks is not None else default_blocks, ) new_cols: dict[str, Any] = {} for col in schema.columns: @@ -6757,7 +7028,10 @@ def from_pandas(cls, df, row_cls) -> CTable: # noqa: C901 new_cols[col.name] = dict_col continue shape = cls._column_physical_shape(col, capacity) - chunks, blocks = cls._column_chunks_blocks(col, shape) + if col.name in aligned_names: + chunks, blocks = shared_chunks, shared_blocks + else: + chunks, blocks = cls._column_chunks_blocks(col, shape) new_cols[col.name] = mem_storage.create_column( col.name, dtype=col.dtype, @@ -7150,6 +7424,49 @@ def computed_columns(self) -> dict[str, dict]: """ return dict(self._computed_cols) # shallow copy so callers can't mutate + def _aligned_grid(self) -> tuple | None: + """Return ``(chunks, blocks)`` shared by the aligned fixed-size columns. + + Inspects the actual stored 1-D NDArray columns and returns the grid + used by the largest aligned subset (the fast-path set), or ``None`` when + there are no such columns. Works for both freshly created and reopened + tables since it reads the columns' real chunk/block shapes. + """ + from collections import Counter + + grids = Counter() + for name in self._stored_col_names: + col = self._cols.get(name) if hasattr(self._cols, "get") else self._cols[name] + chunks = getattr(col, "chunks", None) + blocks = getattr(col, "blocks", None) + if chunks is None or blocks is None or len(chunks) != 1: + continue + grids[(tuple(chunks), tuple(blocks))] += 1 + if not grids: + return None + (chunks, blocks), _ = grids.most_common(1)[0] + return chunks, blocks + + @property + def chunks(self) -> tuple | None: + """Chunk shape shared by the table's aligned fixed-size columns. + + ``None`` if the table has no fixed-size scalar columns. See + :attr:`blocks` for the matching block shape. + """ + grid = self._aligned_grid() + return grid[0] if grid is not None else None + + @property + def blocks(self) -> tuple | None: + """Block shape shared by the table's aligned fixed-size columns. + + ``None`` if the table has no fixed-size scalar columns. See + :attr:`chunks` for the matching chunk shape. + """ + grid = self._aligned_grid() + return grid[1] if grid is not None else None + def _ensure_generated_column_not_stale(self, name: str) -> None: meta = self._root_table._materialized_cols.get(name) if meta is not None and meta.get("stale", False): @@ -8237,7 +8554,7 @@ def _nested_namespace(self, prefix: str): for name in self.col_names: parts = split_field_path(name) if parts[: len(prefix_parts)] == prefix_parts and len(parts) > len(prefix_parts): - return _NestedColumnNamespace(self, prefix) + return NestedColumn(self, prefix) return None def __getattr__(self, s: str): @@ -8781,12 +9098,17 @@ def _empty_copy(self, capacity: int | None = None) -> CTable: capacity = max(capacity if capacity is not None else self._n_rows, 1) default_chunks, default_blocks = compute_chunks_blocks((capacity,)) + # Align fixed-size scalar columns (and the _valid_rows mask) on one + # shared grid so lazy expressions over them take the fast_eval path. + shared_chunks, shared_blocks, aligned_names = self._compute_aligned_grid( + self._schema.columns, capacity + ) mem_storage = InMemoryTableStorage() new_valid = mem_storage.create_valid_rows( shape=(capacity,), - chunks=default_chunks, - blocks=default_blocks, + chunks=shared_chunks if shared_chunks is not None else default_chunks, + blocks=shared_blocks if shared_blocks is not None else default_blocks, ) new_cols = {} for col in self._schema.columns: @@ -8820,7 +9142,10 @@ def _empty_copy(self, capacity: int | None = None) -> CTable: chunks = col_storage["chunks"] blocks = col_storage["blocks"] if col.config.chunks is None and col.config.blocks is None: - chunks, blocks = self._column_chunks_blocks(col, shape) + if col.name in aligned_names: + chunks, blocks = shared_chunks, shared_blocks + else: + chunks, blocks = self._column_chunks_blocks(col, shape) new_cols[col.name] = mem_storage.create_column( col.name, dtype=col.dtype, @@ -9005,17 +9330,15 @@ def info_items(self) -> list[tuple[str, object]]: items = [ ("type", self.__class__.__name__), ("storage", storage_type), - ("rows", self.nrows), - ("columns", self.ncols), ("view", self.base is not None), + ("nrows", self.nrows), + ("ncols", self.ncols), + ("chunks", self.chunks if self.chunks is not None else "none (no fixed-size columns)"), + ("blocks", self.blocks if self.blocks is not None else "none (no fixed-size columns)"), ("nbytes", format_nbytes_info(self.nbytes)), ("cbytes", format_nbytes_info(self.cbytes)), - ("cratio", f"{self.cratio:.1f}x"), + ("cratio", f"{self.cratio:.2f}x"), ("schema", schema_summary), - ( - "valid_rows_mask", - f"cbytes={format_nbytes_info(self._valid_rows.cbytes)}", - ), ("indexes", index_summary if index_summary else "none"), ] if urlpath is not None: diff --git a/src/blosc2/ctable_storage.py b/src/blosc2/ctable_storage.py index f6313b77..966ed7f2 100644 --- a/src/blosc2/ctable_storage.py +++ b/src/blosc2/ctable_storage.py @@ -227,11 +227,20 @@ def create_varlen_scalar_column(self, name, *, spec, cparams=None, dparams=None) def open_varlen_scalar_column(self, name, spec): raise RuntimeError("In-memory tables have no on-disk representation to open.") - def create_dictionary_column(self, name, *, spec, cparams=None, dparams=None): + def create_dictionary_column( + self, + name, + *, + spec, + cparams=None, + dparams=None, + codes_shape=(4096,), + codes_chunks=(4096,), + codes_blocks=(256,), + ): from blosc2.schema import VLStringSpec - chunks, blocks = (4096,), (256,) - codes = blosc2.zeros((4096,), dtype=np.int32, chunks=chunks, blocks=blocks) + codes = blosc2.zeros(codes_shape, dtype=np.int32, chunks=codes_chunks, blocks=codes_blocks) dict_store = _ScalarVarLenArray(VLStringSpec(nullable=False)) return DictionaryColumn(spec, codes, dict_store) @@ -509,16 +518,26 @@ def open_varlen_scalar_column(self, name: str, spec) -> _ScalarVarLenArray: _validate_role_metadata(backend, spec) return _ScalarVarLenArray(spec, backend) - def create_dictionary_column(self, name, *, spec, cparams=None, dparams=None) -> DictionaryColumn: + def create_dictionary_column( + self, + name, + *, + spec, + cparams=None, + dparams=None, + codes_shape=(4096,), + codes_chunks=(4096,), + codes_blocks=(256,), + ) -> DictionaryColumn: from blosc2.schema import VLStringSpec # Codes: stored as a regular NDArray under _cols/name codes = self.create_column( name, dtype=np.int32, - shape=(4096,), - chunks=(4096,), - blocks=(256,), + shape=codes_shape, + chunks=codes_chunks, + blocks=codes_blocks, cparams=cparams, dparams=dparams, ) @@ -1009,15 +1028,18 @@ def create_dictionary_column( spec, cparams=None, dparams=None, + codes_shape=(4096,), + codes_chunks=(4096,), + codes_blocks=(256,), ) -> DictionaryColumn: from blosc2.schema import VLStringSpec codes = self.create_column( name, dtype=np.int32, - shape=(4096,), - chunks=(4096,), - blocks=(256,), + shape=codes_shape, + chunks=codes_chunks, + blocks=codes_blocks, cparams=cparams, dparams=dparams, ) diff --git a/src/blosc2/dictionary_column.py b/src/blosc2/dictionary_column.py index f9148d07..58b8586b 100644 --- a/src/blosc2/dictionary_column.py +++ b/src/blosc2/dictionary_column.py @@ -142,27 +142,35 @@ def extend_from_arrow(self, pa, arrow_col, pos: int, m: int, *, ordered: bool = """ local_dict = arrow_col.dictionary.to_pylist() - # Build local-code → global-code mapping. - local_to_global: dict[int, int] = {} + # Build a local-code → global-code lookup table. The chunk-local + # dictionary is small (one entry per distinct value in the batch), so + # this Python loop is cheap; the per-row translation below is vectorised. + lut = np.empty(len(local_dict), dtype=np.int32) for local_code, value in enumerate(local_dict): - if value is None: - local_to_global[local_code] = self._spec.null_code - else: - local_to_global[local_code] = self.encode(value) + lut[local_code] = self._spec.null_code if value is None else self.encode(value) if ordered and len(local_dict) > 0: self._validate_ordered_chunk_dict(local_dict) - # Translate Arrow indices to global int32 codes. - indices = arrow_col.indices.to_pylist() - global_codes = np.empty(m, dtype=np.int32) - for i, idx in enumerate(indices): - if idx is None: - if not self._spec.nullable: - raise ValueError("Dictionary column is not nullable but Arrow input contains nulls.") - global_codes[i] = self._spec.null_code + # Translate Arrow indices to global int32 codes with a single numpy + # gather (lut[indices]) instead of a per-row Python loop. + indices = arrow_col.indices + if indices.null_count: + if not self._spec.nullable: + raise ValueError("Dictionary column is not nullable but Arrow input contains nulls.") + if len(lut) == 0: + global_codes = np.full(m, self._spec.null_code, dtype=np.int32) else: - global_codes[i] = local_to_global[int(idx)] + null_mask = np.asarray(indices.is_null()) + local_codes = indices.fill_null(0).to_numpy(zero_copy_only=False) + global_codes = lut[local_codes] + global_codes[null_mask] = self._spec.null_code + elif len(lut) == 0: + # No local entries and no nulls means an empty batch. + global_codes = np.empty(m, dtype=np.int32) + else: + local_codes = indices.to_numpy(zero_copy_only=False) + global_codes = lut[local_codes] self._codes[pos : pos + m] = global_codes diff --git a/src/blosc2/list_array.py b/src/blosc2/list_array.py index ec890de3..936764e8 100644 --- a/src/blosc2/list_array.py +++ b/src/blosc2/list_array.py @@ -649,7 +649,7 @@ def info_items(self) -> list: ("pending_rows", len(self._pending_cells) if self.spec.storage == "batch" else 0), ("nbytes", format_nbytes_info(self.nbytes)), ("cbytes", format_nbytes_info(self.cbytes)), - ("cratio", f"{self.cratio:.2f}"), + ("cratio", f"{self.cratio:.2f}x"), ] def to_cframe(self) -> bytes: diff --git a/src/blosc2/ndarray.py b/src/blosc2/ndarray.py index 8e6186f0..7e16c25b 100644 --- a/src/blosc2/ndarray.py +++ b/src/blosc2/ndarray.py @@ -3935,23 +3935,16 @@ def info(self) -> InfoReporter: chunks : (10,) blocks : (10,) dtype : int64 - cratio : 0.73 - cparams : {'blocksize': 80, - 'clevel': 1, - 'codec': , - 'codec_meta': 0, - 'filters': [, - , - , - , - , - ], - 'filters_meta': [0, 0, 0, 0, 0, 0], - 'nthreads': 4, - 'splitmode': , - 'typesize': 8, - 'use_dict': 0} - dparams : {'nthreads': 4} + nbytes : 80 (80 B) + cbytes : 98 (98 B) + cratio : 0.82x + cparams : CParams(codec=, codec_meta=0, clevel=5, use_dict=False, typesize=8, + : nthreads=8, blocksize=80, splitmode=, + : filters=[, , , + : , , ], filters_meta=[0, 0, + : 0, 0, 0, 0], tuner=) + dparams : DParams(nthreads=8) + """ return InfoReporter(self) @@ -3968,7 +3961,7 @@ def info_items(self) -> list: items += [("dtype", self.dtype)] items += [("nbytes", format_nbytes_info(self.nbytes))] items += [("cbytes", format_nbytes_info(self.cbytes))] - items += [("cratio", f"{self.cratio:.2f}")] + items += [("cratio", f"{self.cratio:.2f}x")] items += [("cparams", self.cparams)] items += [("dparams", self.dparams)] return items diff --git a/src/blosc2/objectarray.py b/src/blosc2/objectarray.py index f602b7a2..f2d89ecd 100644 --- a/src/blosc2/objectarray.py +++ b/src/blosc2/objectarray.py @@ -384,7 +384,7 @@ def info_items(self) -> list: ("chunk_cbytes_avg", f"{avg_chunk_cbytes:.2f}"), ("nbytes", format_nbytes_info(self.nbytes)), ("cbytes", format_nbytes_info(self.cbytes)), - ("cratio", f"{self.cratio:.2f}"), + ("cratio", f"{self.cratio:.2f}x"), ("cparams", self.cparams), ("dparams", self.dparams), ] diff --git a/src/blosc2/schunk.py b/src/blosc2/schunk.py index bf1a0a2e..d4332a9f 100644 --- a/src/blosc2/schunk.py +++ b/src/blosc2/schunk.py @@ -533,15 +533,16 @@ def info(self) -> InfoReporter: chunksize : 24000 blocksize : 0 typesize : 1 - nbytes : 24000 - cbytes : 82 - cratio : 292.68 - cparams : CParams(codec=, codec_meta=0, clevel=1, use_dict=False, typesize=1, + nbytes : 24000 (23.44 KiB) + cbytes : 82 (82 B) + cratio : 292.68x + cparams : CParams(codec=, codec_meta=0, clevel=5, use_dict=False, typesize=1, : nthreads=8, blocksize=0, splitmode=, : filters=[, , , : , , ], filters_meta=[0, : 0, 0, 0, 0, 0], tuner=) dparams : DParams(nthreads=8) + """ return InfoReporter(self) @@ -557,7 +558,7 @@ def info_items(self) -> list: items += [("typesize", self.typesize)] items += [("nbytes", format_nbytes_info(self.nbytes))] items += [("cbytes", format_nbytes_info(self.cbytes))] - items += [("cratio", f"{self.cratio:.2f}")] + items += [("cratio", f"{self.cratio:.2f}x")] items += [("cparams", self.cparams)] items += [("dparams", self.dparams)] return items diff --git a/tests/ctable/test_arrow_interop.py b/tests/ctable/test_arrow_interop.py index 8bc1bde9..3d3a9b2f 100644 --- a/tests/ctable/test_arrow_interop.py +++ b/tests/ctable/test_arrow_interop.py @@ -345,5 +345,79 @@ def test_from_arrow_unsupported_type_raises(): CTable.from_arrow(at.schema, at.to_batches()) +@pytest.mark.parametrize("batch_size", [1, 7, 100, 333, 1000, 1500]) +def test_chunk_aligned_writer_matches_direct_write(batch_size): + """The import-time buffered writer reproduces a plain element-by-element + write regardless of how appends straddle chunk boundaries.""" + from blosc2.ctable import _ChunkAlignedWriter + + n = 4321 + chunk_len = 1000 + data = np.arange(n, dtype=np.float64) + + arr = blosc2.empty((n,), dtype=np.float64, chunks=(chunk_len,)) + writer = _ChunkAlignedWriter(arr, chunk_len) + for start in range(0, n, batch_size): + writer.append(data[start : start + batch_size]) + writer.flush() + + np.testing.assert_array_equal(arr[:], data) + + +def test_from_arrow_dictionary_codes_use_aligned_grid(): + """Imported dictionary columns create their int32 codes at full capacity + on the aligned grid, not the tiny 4096-row default (which caused a + create-then-resize and thousands of micro-chunks).""" + n = 500_000 + rng = np.random.default_rng(0) + labels = np.array(["alpha", "beta", "gamma", "delta", "epsilon"]) + schema = pa.schema( + [ + pa.field("a", pa.float32()), + pa.field("c", pa.dictionary(pa.int32(), pa.string())), + ] + ) + a = pa.array(rng.random(n).astype("f4")) + c = pa.array(labels[rng.integers(0, len(labels), n)]).dictionary_encode() + t = CTable.from_arrow(schema, [pa.record_batch([a, c], schema=schema)], capacity_hint=n) + + codes = t._cols["c"].codes + # Codes share the numeric column's (aligned) grid and are not micro-chunked. + assert codes.chunks == t._cols["a"].chunks + assert codes.schunk.nchunks < 10 + assert list(t["c"][:5]) == c.to_pylist()[:5] + + +def test_from_arrow_variable_batches_roundtrip(): + """Variable-sized Arrow batches that straddle the column chunk grid import + losslessly (exercises the chunk-aligned write buffer).""" + + @dataclass + class Row: + a: float = blosc2.field(blosc2.float32(), default=0.0) + d: float = blosc2.field(blosc2.float64(), default=0.0) + + rng = np.random.default_rng(0) + sizes = [854_973, 996_662, 1_002_093, 145_272] # uneven, cross 1.25M chunks + n = sum(sizes) + a_all = rng.random(n).astype("f4") + d_all = -rng.random(n) + + schema = pa.schema([pa.field("a", pa.float32()), pa.field("d", pa.float64())]) + batches, off = [], 0 + for s in sizes: + batches.append( + pa.record_batch([pa.array(a_all[off : off + s]), pa.array(d_all[off : off + s])], schema=schema) + ) + off += s + + t = CTable.from_arrow(schema, batches, capacity_hint=n) + assert len(t) == n + np.testing.assert_array_equal(t._cols["a"][:], a_all) + np.testing.assert_array_equal(t._cols["d"][:], d_all) + # All rows marked valid by the single end-of-import write. + assert int(blosc2.count_nonzero(t._valid_rows[:n])) == n + + if __name__ == "__main__": pytest.main(["-v", __file__]) diff --git a/tests/ctable/test_column.py b/tests/ctable/test_column.py index b97d88a5..3e8da1b5 100644 --- a/tests/ctable/test_column.py +++ b/tests/ctable/test_column.py @@ -5,6 +5,7 @@ # SPDX-License-Identifier: BSD-3-Clause ####################################################################### +import re from dataclasses import dataclass import numpy as np @@ -91,7 +92,7 @@ def test_column_info(): assert "physical_length" not in text assert "logical_shape" not in text assert "table_physical_length" not in text - assert "storage" in text + assert "backend" in text def test_dictionary_column_info(): @@ -814,7 +815,7 @@ def test_info_shows_open_mode_for_persistent_table(tmp_path): info = repr(opened.info) assert "capacity" not in info assert "read_only" not in info - assert "open_mode : r" in info + assert "open_mode : r" in info opened.close() @@ -824,13 +825,6 @@ def test_info_schema_expands_unicode_dtype_labels(): assert "U16 (Unicode)" in info -def test_info_valid_rows_mask_only_reports_cbytes(): - t = CTable(Row, new_data=DATA20) - info = repr(t.info) - assert "valid_rows_mask : cbytes=" in info - assert "valid_rows_mask : nbytes=" not in info - - def test_info_indexes_only_report_cbytes(tmp_path): @dataclass class IndexedRow: @@ -843,17 +837,18 @@ class IndexedRow: t.create_index("id", kind=blosc2.IndexKind.FULL) info = repr(t.info) - index_block = info.split("indexes :", 1)[1] + index_block = info.split("indexes :", 1)[1] assert "cbytes=" in index_block assert "nbytes=" not in index_block assert "cratio=" not in index_block -def test_info_cratio_uses_one_decimal_with_suffix(): +def test_info_cratio_uses_two_decimals_with_suffix(): t = CTable(Row, new_data=DATA20) info = repr(t.info) - assert "cratio :" in info - assert "x" in next(line for line in info.splitlines() if line.startswith("cratio")) + assert "cratio :" in info + cratio_line = next(line for line in info.splitlines() if line.startswith("cratio")) + assert re.search(r"cratio\s+:\s+\d+\.\d{2}x", cratio_line) # ------------------------------------------------------------------- diff --git a/tests/ctable/test_construct.py b/tests/ctable/test_construct.py index a41c7b10..a47955b6 100644 --- a/tests/ctable/test_construct.py +++ b/tests/ctable/test_construct.py @@ -197,5 +197,118 @@ def test_valid_rows(): assert blosc2.count_nonzero(table_extended._valid_rows) == 5 +def test_fixed_columns_share_aligned_grid(): + """Fixed-size scalar columns share one chunk/block grid so lazy + expressions over mixed dtypes can take the fast_eval path.""" + + @dataclass + class MixedRow: + a: float = blosc2.field(blosc2.float32(), default=0.0) + b: float = blosc2.field(blosc2.float32(), default=0.0) + c: float = blosc2.field(blosc2.float32(), default=0.0) + d: float = blosc2.field(blosc2.float64(), default=0.0) + n: int = blosc2.field(blosc2.int32(), default=0) + + table = CTable(MixedRow, expected_size=4_000_000) + grids = {(table._cols[name].chunks, table._cols[name].blocks) for name in table.col_names} + assert len(grids) == 1, f"columns are not aligned: {grids}" + + # The _valid_rows mask shares the same grid so where() keeps the fast path. + assert table._valid_rows.chunks == table._cols["a"].chunks + assert table._valid_rows.blocks == table._cols["a"].blocks + + # The table exposes the shared grid via .chunks / .blocks. + assert table.chunks == table._cols["a"].chunks + assert table.blocks == table._cols["a"].blocks + assert table.chunks is not None + assert len(table.chunks) == 1 + + +def test_wide_string_column_excluded_from_aligned_grid(): + """Very wide fixed-width string columns keep per-dtype sizing instead of + inheriting the shared grid (which would produce huge chunks).""" + + @dataclass + class WideRow: + a: float = blosc2.field(blosc2.float32(), default=0.0) + d: float = blosc2.field(blosc2.float64(), default=0.0) + s: str = blosc2.field(blosc2.string(max_length=50000), default="") + + table = CTable(WideRow, expected_size=4_000_000) + # Numeric columns share the aligned grid... + assert table._cols["a"].chunks == table._cols["d"].chunks + assert table._cols["a"].blocks == table._cols["d"].blocks + # ...but the wide string column does not (it would blow up the chunk size). + assert table._cols["s"].chunks != table._cols["a"].chunks + # The reported grid reflects the aligned (numeric) set. + assert table.chunks == table._cols["a"].chunks + + +def test_small_strings_align_but_large_ones_do_not(): + """Fixed strings up to 128 bytes (U32) join the shared grid so string + filters stay on the fast path; larger ones keep per-dtype sizing.""" + + @dataclass + class StrRow: + a: float = blosc2.field(blosc2.float32(), default=0.0) + b: float = blosc2.field(blosc2.float32(), default=0.0) + s32: str = blosc2.field(blosc2.string(max_length=32), default="") # 128 bytes + s64: str = blosc2.field(blosc2.string(max_length=64), default="") # 256 bytes + + table = CTable(StrRow, expected_size=4_000_000) + numeric_grid = table._cols["a"].chunks + # Strings stay out of the median, so the grid is still sized for float32. + assert table._cols["b"].chunks == numeric_grid + # U32 (128 bytes) joins the shared grid; U64 (256 bytes) does not. + assert table._cols["s32"].chunks == numeric_grid + assert table._cols["s64"].chunks != numeric_grid + + +def test_all_string_table_aligns(): + """A table with only fixed-string columns still shares one grid.""" + + @dataclass + class OnlyStr: + s: str = blosc2.field(blosc2.string(max_length=8), default="") + t: str = blosc2.field(blosc2.string(max_length=16), default="") + + table = CTable(OnlyStr, expected_size=1000) + assert table._cols["s"].chunks == table._cols["t"].chunks + assert table.chunks == table._cols["s"].chunks + + +def test_from_arrow_aligns_columns_and_mask(): + """The Arrow-import path (used by parquet-to-blosc2) aligns fixed-size + columns and the _valid_rows mask on a single shared grid.""" + pa = pytest.importorskip("pyarrow") + + n = 200_000 + rng = np.random.default_rng(0) + tbl = pa.table( + { + "tips": pa.array(rng.random(n).astype("f4")), + "km": pa.array(rng.random(n).astype("f4")), + "lon": pa.array(-rng.random(n)), # float64: previously misaligned + } + ) + table = CTable.from_arrow(tbl.schema, tbl.to_batches(), capacity_hint=n) + grids = {(table._cols[name].chunks, table._cols[name].blocks) for name in table.col_names} + grids.add((table._valid_rows.chunks, table._valid_rows.blocks)) + assert len(grids) == 1, f"from_arrow did not align grids: {grids}" + + +def test_empty_table_grid_properties(): + """A table with no fixed-size scalar columns reports None for chunks/blocks + only when there is nothing to align.""" + + @dataclass + class ScalarRow: + x: int = blosc2.field(blosc2.int64(), default=0) + + table = CTable(ScalarRow, expected_size=1000) + assert table.chunks is not None + assert table.blocks is not None + + if __name__ == "__main__": pytest.main(["-v", __file__]) diff --git a/tests/ctable/test_ctable_dataclass_schema.py b/tests/ctable/test_ctable_dataclass_schema.py index 831254f7..d3ce11b1 100644 --- a/tests/ctable/test_ctable_dataclass_schema.py +++ b/tests/ctable/test_ctable_dataclass_schema.py @@ -348,7 +348,7 @@ class NestedRow: assert t.trip.col_names == ["begin.lon", "begin.lat"] text = repr(info) - assert "NestedColumnNamespace" in text + assert "NestedColumn" in text assert "storage" in text assert "schema" in text assert "begin.lon" in text diff --git a/tests/test_b2view_model.py b/tests/test_b2view_model.py index 5985af1f..bb0e7b08 100644 --- a/tests/test_b2view_model.py +++ b/tests/test_b2view_model.py @@ -65,7 +65,7 @@ def test_store_browser_metadata_and_previews(tmp_path): table_info = browser.get_info("/table") assert table_info.kind == "ctable" - assert table_info.metadata["rows"] == 6 + assert table_info.metadata["nrows"] == 6 preview = browser.preview("/table", max_rows=3, max_cols=1) assert preview["columns"] == ["x"] assert preview["hidden_columns"] == 1 @@ -83,7 +83,7 @@ def test_store_browser_supports_standalone_ctable(tmp_path): assert browser.list_children("/") == [] info = browser.get_info("/") assert info.kind == "ctable" - assert info.metadata["rows"] == 4 + assert info.metadata["nrows"] == 4 preview = browser.preview("/", max_rows=2) np.testing.assert_array_equal(preview["data"]["x"], np.array([0, 1]))