diff --git a/roborock/map/b01_grid_layers.py b/roborock/map/b01_grid_layers.py new file mode 100644 index 00000000..410d60fd --- /dev/null +++ b/roborock/map/b01_grid_layers.py @@ -0,0 +1,253 @@ +"""Device-agnostic decomposition of a B01 occupancy grid into map layers. + +Both the Q10 (custom binary) and Q7 (SCMap protobuf) deliver their map as a +single-byte-per-cell occupancy grid where the cell *value* encodes a semantic +class (background / wall / per-room floor / ...). This module turns such a grid +into separable **layers** a frontend can stack, without knowing the device's +specific value encoding -- the caller supplies a ``classifier`` mapping a cell +value to a class name, plus the room metadata. + +Coordinates here are **grid-pixel** space (origin top-left of the raw grid, before +any rendering flip/scale). Vector overlays in world/robot coordinates (path, +zones, ...) are placed into this same space by the device's calibration. +""" + +import io +from collections.abc import Callable, Iterable +from dataclasses import dataclass, field +from math import ceil + +from PIL import Image + +# Canonical layer class names. Devices map their raw cell values onto these. +LAYER_BACKGROUND = "background" +LAYER_WALL = "wall" +LAYER_FLOOR = "floor" +LAYER_UNKNOWN = "unknown" + +_PNG = "PNG" + + +@dataclass +class RoomLayer: + """A single room (segment) and where its pixels sit in the grid.""" + + id: int + name: str + pixel_value: int + pixel_count: int + bbox: tuple[int, int, int, int] + """Inclusive ``(min_x, min_y, max_x, max_y)`` bounding box in grid pixels.""" + + @property + def center(self) -> tuple[float, float]: + """Center of the bounding box in grid-pixel space (for label placement).""" + min_x, min_y, max_x, max_y = self.bbox + return ((min_x + max_x) / 2, (min_y + max_y) / 2) + + +@dataclass +class GridLayers: + """Separable layers decomposed from a single occupancy grid. + + Holds a reference to the raw ``grid`` plus the classifier, and renders any + layer to a transparent RGBA PNG on demand (so we don't eagerly build a mask + per room). ``class_counts`` reports how many cells fall in each class. + """ + + width: int + height: int + grid: bytes + rooms: list[RoomLayer] + classifier: Callable[[int], str] + class_counts: dict[str, int] = field(default_factory=dict) + flip: bool = True + """Whether display rendering flips the grid top-to-bottom. Devices whose grid + is stored bottom-up (V1/Q7 convention) flip; the Q10 grid is stored top-down, + so it does not. Used as the default for the ``render_*`` methods so every + layer matches the device's composited map orientation.""" + + def cell_class(self, value: int) -> str: + """Classify a single raw cell value into a canonical layer name.""" + return self.classifier(value) + + def render_mask( + self, + predicate: Callable[[int], bool], + color: tuple[int, int, int, int], + *, + scale: int = 1, + flip: bool | None = None, + ) -> bytes: + """Render cells matching ``predicate`` as ``color`` over transparency. + + ``flip`` applies the same top-to-bottom flip the composited map uses so + layers line up pixel-for-pixel (defaults to the device's :attr:`flip`); + ``scale`` nearest-neighbour upsamples. + """ + if flip is None: + flip = self.flip + transparent = (0, 0, 0, 0) + px = bytearray() + for value in self.grid: + px.extend(color if predicate(value) else transparent) + img = Image.frombytes("RGBA", (self.width, self.height), bytes(px)) + if flip: + img = img.transpose(Image.Transpose.FLIP_TOP_BOTTOM) + if scale > 1: + img = img.resize((self.width * scale, self.height * scale), Image.Resampling.NEAREST) + buf = io.BytesIO() + img.save(buf, format=_PNG) + return buf.getvalue() + + def render_class( + self, layer: str, color: tuple[int, int, int, int], *, scale: int = 1, flip: bool | None = None + ) -> bytes: + """Render a whole class layer (e.g. ``"wall"``) to an RGBA PNG.""" + return self.render_mask(lambda v: self.classifier(v) == layer, color, scale=scale, flip=flip) + + def render_room( + self, room_id: int, color: tuple[int, int, int, int], *, scale: int = 1, flip: bool | None = None + ) -> bytes: + """Render a single room's pixels to an RGBA PNG.""" + room = next((r for r in self.rooms if r.id == room_id), None) + if room is None: + raise KeyError(f"No room with id {room_id}") + target = room.pixel_value + return self.render_mask(lambda v: v == target, color, scale=scale, flip=flip) + + +@dataclass +class GridCalibration: + """Affine transform between device world coordinates and grid pixels. + + ``resolution`` is world-units per pixel. The Y axis is flipped between world + and grid space via ``y_sign`` (devices typically have world-Y increasing + upward while the grid's Y increases downward). + """ + + resolution: float + origin_x: float + origin_y: float + y_sign: int = 1 + + def world_to_pixel(self, x: float, y: float) -> tuple[float, float]: + """Map a world ``(x, y)`` to grid-pixel ``(px, py)``.""" + return (x / self.resolution + self.origin_x, self.origin_y - self.y_sign * y / self.resolution) + + def pixel_to_world(self, px: float, py: float) -> tuple[float, float]: + """Map a grid-pixel ``(px, py)`` back to world ``(x, y)``.""" + return ((px - self.origin_x) * self.resolution, self.y_sign * (self.origin_y - py) * self.resolution) + + +def solve_calibration( + layers: GridLayers, + points: list[tuple[float, float]], + *, + resolutions: Iterable[float], + y_signs: Iterable[int] = (1, -1), +) -> GridCalibration | None: + """Fit a :class:`GridCalibration` by overlaying ``points`` onto the floor. + + A cleaning path must lie on floor (not walls/background). For each candidate + resolution and Y orientation we slide the path's pixel bounding box across + the map and keep the placement that maximises on-floor points while + penalising points landing on walls/background. Returns ``None`` if no + placement lands a clear majority of points on floor. + + The search is bounded: the path bbox size fixes how far it can slide, so the + offset range is small. Needs a reasonably dense, shape-rich path (a long + clean) to be well-constrained. + """ + if not points: + return None + w, h = layers.width, layers.height + classify = layers.classifier + # 1 = floor, 2 = wall/background (blocked), 0 = other. Index by cell. + klass = bytes( + 1 if (c := classify(v)) == LAYER_FLOOR else 2 if c in (LAYER_WALL, LAYER_BACKGROUND) else 0 for v in layers.grid + ) + + best: tuple[float, GridCalibration] | None = None + for resolution in resolutions: + if resolution <= 0: + continue + for y_sign in y_signs: + sx = [x / resolution for x, _ in points] + sy = [y_sign * y / resolution for _, y in points] + min_sx, max_sx, min_sy, max_sy = min(sx), max(sx), min(sy), max(sy) + if (max_sx - min_sx) >= w or (max_sy - min_sy) >= h: + continue # path wider/taller than the map at this resolution + pts = list(zip(sx, sy)) + # Slide so every point stays in-bounds: px = px_f + ox in [0, w), py = oy - py_f in [0, h). + for ox in range(ceil(-min_sx), ceil(w - max_sx)): + for oy in range(ceil(max_sy), ceil(h + min_sy)): + on_floor = 0 + blocked = 0 + for px_f, py_f in pts: + cell = int(oy - py_f) * w + int(px_f + ox) + k = klass[cell] + if k == 1: + on_floor += 1 + elif k == 2: + blocked += 1 + score = on_floor - 1.5 * blocked + if best is None or score > best[0]: + best = (score, GridCalibration(float(resolution), float(ox), float(oy), y_sign)) + + if best is None or best[0] < len(points) * 0.5: + return None + return best[1] + + +def decompose_grid( + width: int, + height: int, + grid: bytes, + rooms: Iterable[tuple[int, str, int, int]], + classifier: Callable[[int], str], + *, + flip: bool = True, +) -> GridLayers: + """Build :class:`GridLayers` from a grid + room records + a classifier. + + ``rooms`` items are ``(id, name, pixel_value, pixel_count)`` tuples. Per-room + bounding boxes are computed in one pass over the grid. ``flip`` records the + device's display orientation (see :attr:`GridLayers.flip`). + """ + room_meta = list(rooms) + bboxes: dict[int, list[int]] = {pv: [width, height, -1, -1] for (_, _, pv, _) in room_meta} + counts: dict[str, int] = {} + for index, value in enumerate(grid): + cls = classifier(value) + counts[cls] = counts.get(cls, 0) + 1 + box = bboxes.get(value) + if box is not None: + x = index % width + y = index // width + if x < box[0]: + box[0] = x + if y < box[1]: + box[1] = y + if x > box[2]: + box[2] = x + if y > box[3]: + box[3] = y + + room_layers: list[RoomLayer] = [] + for room_id, name, pixel_value, pixel_count in room_meta: + box = bboxes[pixel_value] + bbox = (box[0], box[1], box[2], box[3]) if box[2] >= 0 else (0, 0, 0, 0) + room_layers.append( + RoomLayer(id=room_id, name=name, pixel_value=pixel_value, pixel_count=pixel_count, bbox=bbox) + ) + + return GridLayers( + width=width, + height=height, + grid=grid, + rooms=room_layers, + classifier=classifier, + class_counts=counts, + flip=flip, + ) diff --git a/roborock/map/b01_q10_overlays.py b/roborock/map/b01_q10_overlays.py new file mode 100644 index 00000000..e371fdaa --- /dev/null +++ b/roborock/map/b01_q10_overlays.py @@ -0,0 +1,89 @@ +"""Decoders for Q10 (B01/ss07) map vector overlays. + +No-go zones, no-mop zones, virtual walls and zoned-clean areas are not part of +the map raster; the device reports them as base64-encoded blobs in separate data +points (``dpRestrictedZoneUp`` 55, ``dpVirtualWallUp`` 57, ``dpZonedUp`` 59). + +The blob format was reverse-engineered from a live ss07 (confirmed against 7 +real no-go zones): + + [version: u8][count: u8] then ``count`` fixed-size records, each: + [type: u8][vertex_count: u8] then vertex_count (x, y) int16-BE pairs, + zero-padded to the record size. + +Coordinates are in the device's world units (the same space as the cleaning +path), so a :class:`~roborock.map.b01_grid_layers.GridCalibration` maps them to +map pixels. ``type`` distinguishes the restriction kind (0 = no-go, 3 = no-mop +observed); it is preserved verbatim so callers can route polygons to the right +``MapData`` layer. +""" + +import base64 +from dataclasses import dataclass, field + +_DEFAULT_RECORD_SIZE = 38 # 2-byte record header + up to 9 (x, y) int16 pairs + + +@dataclass +class Q10Zone: + """A polygon overlay (no-go / no-mop / virtual wall) in world coordinates.""" + + type: int + vertices: list[tuple[int, int]] = field(default_factory=list) + + +def _as_bytes(data: bytes | str | None) -> bytes: + if data is None: + return b"" + if isinstance(data, bytes): + return data + try: + return base64.b64decode(data + "=" * (-len(data) % 4)) + except (ValueError, base64.binascii.Error): # type: ignore[attr-defined] + return b"" + + +def parse_zone_blob(data: bytes | str | None) -> list[Q10Zone]: + """Decode a Q10 zone/wall overlay blob into a list of :class:`Q10Zone`. + + Accepts the raw bytes or the base64 string straight from the data point. + Returns ``[]`` for empty/absent/unparsable blobs (the device sends a single + ``0x00`` byte when there are none). + """ + raw = _as_bytes(data) + if len(raw) < 2: + return [] + count = raw[1] + if count <= 0: + return [] + + body = raw[2:] + record_size = len(body) // count if count and len(body) % count == 0 else _DEFAULT_RECORD_SIZE + if record_size < 2: + return [] + + zones: list[Q10Zone] = [] + for index in range(count): + record = body[index * record_size : (index + 1) * record_size] + if len(record) < 2: + break + zone_type = record[0] + vertex_count = record[1] + needed = 2 + vertex_count * 4 + if needed > len(record): + continue # malformed record; skip rather than misread padding + vertices = [ + ( + int.from_bytes(record[2 + j * 4 : 4 + j * 4], "big", signed=True), + int.from_bytes(record[4 + j * 4 : 6 + j * 4], "big", signed=True), + ) + for j in range(vertex_count) + ] + zones.append(Q10Zone(type=zone_type, vertices=vertices)) + return zones + + +# Observed ``type`` values. 0 = no-go (vacuum forbidden), 3 = no-mop. Others are +# surfaced verbatim on Q10Zone.type for callers that recognise them. +ZONE_TYPE_NO_GO = 0 +ZONE_TYPE_NO_MOP = 3 diff --git a/tests/map/test_b01_grid_layers.py b/tests/map/test_b01_grid_layers.py new file mode 100644 index 00000000..2299ac3f --- /dev/null +++ b/tests/map/test_b01_grid_layers.py @@ -0,0 +1,119 @@ +"""Tests for the device-agnostic grid->layers decomposition + calibration.""" + +import io + +from PIL import Image + +from roborock.map.b01_grid_layers import ( + LAYER_BACKGROUND, + LAYER_FLOOR, + LAYER_WALL, + GridCalibration, + decompose_grid, + solve_calibration, +) + + +def test_decompose_grid_generic_classifier_and_bbox() -> None: + """A hand-built grid decomposes into the right classes with room bboxes.""" + # 4x3 grid: row0 walls(9), row1 = floor room 5 (value 20) at x1..2, row2 background(7) + grid = bytes( + [ + 9, + 9, + 9, + 9, + 0, + 20, + 20, + 0, + 7, + 7, + 7, + 7, + ] + ) + + def classify(v: int) -> str: + if v == 9: + return LAYER_WALL + if v == 7: + return LAYER_BACKGROUND + if v == 0: + return "unknown" + return LAYER_FLOOR + + layers = decompose_grid(4, 3, grid, [(5, "Office", 20, 2)], classify) + assert layers.class_counts == {LAYER_WALL: 4, "unknown": 2, LAYER_FLOOR: 2, LAYER_BACKGROUND: 4} + assert len(layers.rooms) == 1 + room = layers.rooms[0] + assert room.id == 5 and room.name == "Office" and room.pixel_value == 20 + assert room.bbox == (1, 1, 2, 1) # the two floor cells, row 1, x in 1..2 + assert room.center == (1.5, 1.0) + + +def test_render_mask_produces_transparent_rgba() -> None: + grid = bytes([0, 20, 20, 0]) + + def classify(v: int) -> str: + return LAYER_FLOOR if v == 20 else "unknown" + + layers = decompose_grid(4, 1, grid, [(5, "Office", 20, 2)], classify) + png = layers.render_class(LAYER_FLOOR, (10, 20, 30, 255), flip=False) + img = Image.open(io.BytesIO(png)) + assert img.mode == "RGBA" and img.size == (4, 1) + pixels = list(img.getdata()) + assert pixels == [(0, 0, 0, 0), (10, 20, 30, 255), (10, 20, 30, 255), (0, 0, 0, 0)] + + +def test_render_scale_upsamples() -> None: + layers = decompose_grid(2, 1, bytes([20, 20]), [(5, "R", 20, 2)], lambda v: LAYER_FLOOR) + png = layers.render_class(LAYER_FLOOR, (1, 2, 3, 4), scale=3) + assert Image.open(io.BytesIO(png)).size == (6, 3) + + +def test_calibration_roundtrip() -> None: + cal = GridCalibration(resolution=2.0, origin_x=3.0, origin_y=8.0, y_sign=1) + assert cal.world_to_pixel(0, 0) == (3.0, 8.0) + assert cal.world_to_pixel(10, 10) == (8.0, 3.0) # +x right, +y up (flipped) + back = cal.pixel_to_world(*cal.world_to_pixel(6, -4)) + assert back == (6, -4) + + +def _floor_block_layers(): + """12x12 grid with a 6x6 floor block at x[3..8], y[3..8]; rest background.""" + grid = bytearray([9] * 144) # 9 = background + for y in range(3, 9): + for x in range(3, 9): + grid[y * 12 + x] = 1 # 1 = floor + classify = {1: LAYER_FLOOR, 9: LAYER_BACKGROUND} + return decompose_grid(12, 12, bytes(grid), [], lambda v: classify.get(v, "other")) + + +def test_solve_calibration_recovers_known_transform() -> None: + """A 2D-spread path over a floor block pins resolution + origin uniquely.""" + layers = _floor_block_layers() + true = GridCalibration(2.0, 3.0, 8.0, 1) + # World points chosen to land across the floor block (2D spread, not 1D). + pixels = [(3, 8), (8, 8), (3, 3), (8, 3), (5, 5), (6, 6), (4, 7), (7, 4)] + points = [true.pixel_to_world(px, py) for px, py in pixels] + cal = solve_calibration(layers, points, resolutions=[1.0, 1.5, 2.0, 2.5, 3.0]) + assert cal is not None + assert cal.resolution == 2.0 + assert (cal.origin_x, cal.origin_y, cal.y_sign) == (3.0, 8.0, 1) + + +def test_solve_calibration_considers_right_edge_fit() -> None: + """A valid placement is not skipped when the path bbox reaches the map edge.""" + layers = decompose_grid(4, 2, bytes([1] * 8), [], lambda v: LAYER_FLOOR) + points = [(0.0, 0.0), (3.2, 0.0), (6.4, 0.0)] + cal = solve_calibration(layers, points, resolutions=[2.0], y_signs=[1]) + assert cal is not None + assert (cal.origin_x, cal.origin_y, cal.y_sign) == (0.0, 0.0, 1) + + +def test_solve_calibration_returns_none_when_unfittable() -> None: + layers = _floor_block_layers() + # Points so far apart no resolution keeps them on the 6x6 floor block. + points = [(0.0, 0.0), (1000.0, 0.0), (0.0, 1000.0)] + assert solve_calibration(layers, points, resolutions=[2.0]) is None diff --git a/tests/map/test_b01_q10_overlays.py b/tests/map/test_b01_q10_overlays.py new file mode 100644 index 00000000..9a3ce3db --- /dev/null +++ b/tests/map/test_b01_q10_overlays.py @@ -0,0 +1,58 @@ +"""Tests for the Q10 vector-overlay (no-go / no-mop / virtual wall) decoder.""" + +import base64 + +from roborock.map.b01_q10_overlays import ( + ZONE_TYPE_NO_GO, + ZONE_TYPE_NO_MOP, + parse_zone_blob, +) + + +def _blob(version: int, records: list[bytes], record_size: int = 18) -> bytes: + body = b"".join(r.ljust(record_size, b"\x00") for r in records) + return bytes([version, len(records)]) + body + + +def _rect(zone_type: int, corners: list[tuple[int, int]]) -> bytes: + out = bytes([zone_type, len(corners)]) + for x, y in corners: + out += int.to_bytes(x & 0xFFFF, 2, "big") + int.to_bytes(y & 0xFFFF, 2, "big") + return out + + +def test_parse_zone_blob_two_typed_rectangles() -> None: + rect_a = _rect(ZONE_TYPE_NO_GO, [(0, 0), (10, 0), (10, 10), (0, 10)]) + rect_b = _rect(ZONE_TYPE_NO_MOP, [(-5, -5), (5, -5), (5, 5), (-5, 5)]) + zones = parse_zone_blob(_blob(1, [rect_a, rect_b])) + assert [z.type for z in zones] == [ZONE_TYPE_NO_GO, ZONE_TYPE_NO_MOP] + assert zones[0].vertices == [(0, 0), (10, 0), (10, 10), (0, 10)] + assert zones[1].vertices == [(-5, -5), (5, -5), (5, 5), (-5, 5)] # signed coords + + +def test_parse_zone_blob_accepts_base64() -> None: + blob = _blob(1, [_rect(ZONE_TYPE_NO_GO, [(1, 2), (3, 4), (5, 6), (7, 8)])]) + zones = parse_zone_blob(base64.b64encode(blob).decode()) + assert len(zones) == 1 and zones[0].vertices[2] == (5, 6) + + +def test_parse_zone_blob_empty_variants() -> None: + assert parse_zone_blob(None) == [] + assert parse_zone_blob(b"\x00") == [] # device's "no zones" sentinel + assert parse_zone_blob("AA==") == [] # base64 of 0x00 + assert parse_zone_blob(bytes([1, 0, 0])) == [] # version=1, count=0 + + +def test_parse_zone_blob_skips_malformed_record() -> None: + # vertex_count claims 9 verts (needs 38 bytes) but record is only 18 -> skipped. + bad = bytes([ZONE_TYPE_NO_GO, 9]) + b"\x00" * 16 + good = _rect(ZONE_TYPE_NO_GO, [(1, 1), (2, 2), (3, 3), (4, 4)]) + zones = parse_zone_blob(_blob(1, [bad, good])) + assert len(zones) == 1 and zones[0].vertices[0] == (1, 1) + + +def test_parse_zone_blob_real_record_size_inferred() -> None: + """Record size is inferred from total/count (real device uses 38).""" + rect = _rect(ZONE_TYPE_NO_GO, [(100, 200), (300, 200), (300, 50), (100, 50)]) + zones = parse_zone_blob(_blob(1, [rect], record_size=38)) + assert len(zones) == 1 and zones[0].vertices[0] == (100, 200)