diff --git a/roborock/cli.py b/roborock/cli.py index b36b11ce..663d386d 100644 --- a/roborock/cli.py +++ b/roborock/cli.py @@ -50,6 +50,7 @@ from roborock.devices.device import RoborockDevice from roborock.devices.device_manager import DeviceManager, UserParams, create_device_manager from roborock.devices.traits import Trait +from roborock.devices.traits.b01.q10 import Q10PropertiesApi from roborock.devices.traits.b01.q10.vacuum import VacuumTrait from roborock.devices.traits.v1 import V1TraitMixin from roborock.devices.traits.v1.consumeable import ConsumableAttribute @@ -521,6 +522,46 @@ async def maps(ctx, device_id: str): await _display_v1_trait(context, device_id, lambda v1: v1.maps) +# The Q10 pushes its map ~9s after a dpRequestDps; firmware throttles pushes to +# ~once per 60-70s, so a single request is answered quickly but rapid re-requests +# may not be. This bounds how long a one-shot CLI command waits for that push. +_Q10_MAP_PUSH_TIMEOUT = 30.0 + + +async def _await_q10_map_push( + properties: Q10PropertiesApi, + predicate: Callable[[], bool], + *, + timeout: float = _Q10_MAP_PUSH_TIMEOUT, +) -> bool: + """Nudge a Q10 to push its map/trace and wait until ``predicate`` holds. + + The Q10 map API is entirely push-driven: there is no synchronous get-map + request. A ``dpRequestDps`` causes the device to publish a ``MAP_RESPONSE``, + which the device's subscribe loop feeds into the map trait. Here we register + an update listener, send the request, and wait for the pushed data to satisfy + ``predicate``. Returns whether it did within ``timeout``. + """ + if predicate(): + return True + loop = asyncio.get_running_loop() + updated: asyncio.Future[None] = loop.create_future() + + def on_update() -> None: + if predicate() and not updated.done(): + updated.set_result(None) + + unsub = properties.map.add_update_listener(on_update) + try: + await properties.refresh() + await asyncio.wait_for(updated, timeout=timeout) + return True + except TimeoutError: + return False + finally: + unsub() + + @session.command() @click.option("--device_id", required=True) @click.option("--output-file", required=True, help="Path to save the map image.") @@ -529,15 +570,76 @@ async def maps(ctx, device_id: str): async def map_image(ctx, device_id: str, output_file: str): """Get device map image and save it to a file.""" context: RoborockContext = ctx.obj - trait: MapContentTrait = await _v1_trait(context, device_id, lambda v1: v1.map_content) - if trait.image_content: + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is not None: + properties = device.b01_q10_properties + await _await_q10_map_push(properties, lambda: properties.map.image_content is not None) + image_content = properties.map.image_content + else: + v1_trait: MapContentTrait = await _v1_trait(context, device_id, lambda v1: v1.map_content) + image_content = v1_trait.image_content + if image_content: with open(output_file, "wb") as f: - f.write(trait.image_content) + f.write(image_content) click.echo(f"Map image saved to {output_file}") else: click.echo("No map image content available.") +@session.command() +@click.option("--device_id", required=True) +@click.option("--output-dir", default=None, help="If set, write one transparent PNG per layer here.") +@click.pass_context +@async_command +async def q10_map_layers(ctx, device_id: str, output_dir: str | None): + """List the Q10 map's separable layers (background/wall/floor/per-room). + + With --output-dir, also exports each layer as a transparent PNG that can be + stacked in a frontend (background, then floor, then walls, then each room). + """ + import os + + context: RoborockContext = ctx.obj + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is None: + click.echo("Feature not supported by device") + return + properties = device.b01_q10_properties + await _await_q10_map_push(properties, lambda: properties.map.layers is not None) + layers = properties.map.layers + if layers is None: + click.echo("No map layers available.") + return + + summary = { + "size": {"width": layers.width, "height": layers.height}, + "class_counts": layers.class_counts, + "rooms": [ + {"id": r.id, "name": r.name, "pixel_count": r.pixel_count, "bbox": list(r.bbox)} for r in layers.rooms + ], + } + click.echo(dump_json(summary)) + + if output_dir: + os.makedirs(output_dir, exist_ok=True) + exports = { + "background": layers.render_class("background", (210, 210, 215, 255), scale=2), + "floor": layers.render_class("floor", (70, 170, 95, 200), scale=2), + "wall": layers.render_class("wall", (20, 20, 25, 255), scale=2), + } + for name, png in exports.items(): + with open(os.path.join(output_dir, f"layer_{name}.png"), "wb") as f: + f.write(png) + for room in layers.rooms: + png = layers.render_room(room.id, (90, 140, 220, 200), scale=2) + safe = "".join(c if c.isalnum() else "_" for c in room.name) or f"room{room.id}" + with open(os.path.join(output_dir, f"room_{room.id}_{safe}.png"), "wb") as f: + f.write(png) + click.echo(f"Wrote {3 + len(layers.rooms)} layer PNGs to {output_dir}") + + @session.command() @click.option("--device_id", required=True) @click.option("--include_path", is_flag=True, default=False, help="Include path data in the output.") @@ -564,6 +666,75 @@ async def map_data(ctx, device_id: str, include_path: bool): click.echo(dump_json(data_summary)) +@session.command() +@click.option("--device_id", required=True) +@click.option("--include_path", is_flag=True, default=False, help="Include all path points in the output.") +@click.pass_context +@async_command +async def q10_position(ctx, device_id: str, include_path: bool): + """Get the current Q10 robot position and live cleaning path. + + The Q10 only streams its position/path while it is actively cleaning, so this + will report that no live trace is available for an idle/docked robot. + """ + context: RoborockContext = ctx.obj + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is None: + click.echo("Feature not supported by device") + return + properties = device.b01_q10_properties + got_trace = await _await_q10_map_push(properties, lambda: bool(properties.map.path)) + if not got_trace: + click.echo("No live trace available (the robot only reports position while cleaning).") + return + map_trait = properties.map + position = map_trait.robot_position + summary: dict[str, Any] = { + "robot_position": {"x": position.x, "y": position.y} if position else None, + "path_points": len(map_trait.path), + } + if include_path: + summary["path"] = [[p.x, p.y] for p in map_trait.path] + click.echo(dump_json(summary)) + + +@session.command() +@click.option("--device_id", required=True) +@click.option("--output-file", required=True, help="Path to save the map image with the path drawn.") +@click.pass_context +@async_command +async def q10_map_with_path(ctx, device_id: str, output_file: str): + """Render the Q10 map with the current cleaning path + robot position drawn. + + Needs the robot to be actively cleaning (the path/calibration come from the + live trace). Fetches the map and the path, solves the world<->pixel + calibration, and writes the annotated PNG. + """ + context: RoborockContext = ctx.obj + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is None: + click.echo("Feature not supported by device") + return + properties = device.b01_q10_properties + map_trait = properties.map + await _await_q10_map_push(properties, lambda: map_trait.image_content is not None) + got_path = await _await_q10_map_push(properties, lambda: bool(map_trait.path)) + if not got_path: + click.echo("No live path available (the robot only reports its path while cleaning).") + return + try: + image = map_trait.render_path_on_map() + except RoborockException as err: + click.echo(f"Could not render path on map: {err}") + return + with open(output_file, "wb") as f: + f.write(image) + cal = map_trait.calibration + click.echo(f"Saved map with {len(map_trait.path)}-point path to {output_file} (calibration: {cal})") + + @session.command() @click.option("--device_id", required=True) @click.pass_context @@ -705,7 +876,16 @@ async def set_child_lock(ctx, device_id: str, enabled: bool): async def rooms(ctx, device_id: str): """Get device room mapping info.""" context: RoborockContext = ctx.obj - await _display_v1_trait(context, device_id, lambda v1: v1.rooms) + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is not None: + properties = device.b01_q10_properties + # A valid map may have no room records, so wait on the map arriving + # (image_content) rather than on rooms being non-empty. + await _await_q10_map_push(properties, lambda: properties.map.image_content is not None) + click.echo(dump_json({room.id: room.name for room in properties.map.rooms})) + else: + await _display_v1_trait(context, device_id, lambda v1: v1.rooms) @session.command() diff --git a/roborock/data/b01_q10/b01_q10_containers.py b/roborock/data/b01_q10/b01_q10_containers.py index 393eb231..86154b6b 100644 --- a/roborock/data/b01_q10/b01_q10_containers.py +++ b/roborock/data/b01_q10/b01_q10_containers.py @@ -108,3 +108,7 @@ class Q10Status(RoborockBase): back_type: YXBackType | None = field(default=None, metadata={"dps": B01_Q10_DP.BACK_TYPE}) cleaning_progress: int | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_PROGRESS}) fault: int | None = field(default=None, metadata={"dps": B01_Q10_DP.FAULT}) + # Raw base64 map-overlay blobs (decoded by roborock.map.b01_q10_overlays). + restricted_zone_up: str | None = field(default=None, metadata={"dps": B01_Q10_DP.RESTRICTED_ZONE_UP}) + virtual_wall_up: str | None = field(default=None, metadata={"dps": B01_Q10_DP.VIRTUAL_WALL_UP}) + zoned_up: str | None = field(default=None, metadata={"dps": B01_Q10_DP.ZONED_UP}) diff --git a/roborock/devices/rpc/b01_q10_channel.py b/roborock/devices/rpc/b01_q10_channel.py index 1e0510ba..50ec8e5a 100644 --- a/roborock/devices/rpc/b01_q10_channel.py +++ b/roborock/devices/rpc/b01_q10_channel.py @@ -19,7 +19,12 @@ async def stream_decoded_responses( mqtt_channel: MqttChannel, ) -> AsyncGenerator[dict[B01_Q10_DP, Any], None]: - """Stream decoded DPS messages received via MQTT.""" + """Stream decoded DPS messages received via MQTT. + + Messages that are not decodable DPS responses (e.g. protocol-301 + ``MAP_RESPONSE`` map pushes) are skipped; callers that need the raw + messages should subscribe to :meth:`MqttChannel.subscribe_stream` directly. + """ async for response_message in mqtt_channel.subscribe_stream(): try: diff --git a/roborock/devices/traits/b01/q10/__init__.py b/roborock/devices/traits/b01/q10/__init__.py index 184de2d2..9aac42f7 100644 --- a/roborock/devices/traits/b01/q10/__init__.py +++ b/roborock/devices/traits/b01/q10/__init__.py @@ -4,17 +4,21 @@ import logging from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP -from roborock.devices.rpc.b01_q10_channel import stream_decoded_responses from roborock.devices.traits import Trait from roborock.devices.transport.mqtt_channel import MqttChannel +from roborock.exceptions import RoborockException +from roborock.protocols.b01_q10_protocol import decode_rpc_response +from roborock.roborock_message import RoborockMessage from .command import CommandTrait +from .map import MapContentTrait from .remote import RemoteTrait from .status import StatusTrait from .vacuum import VacuumTrait __all__ = [ "Q10PropertiesApi", + "MapContentTrait", ] _LOGGER = logging.getLogger(__name__) @@ -35,6 +39,9 @@ class Q10PropertiesApi(Trait): remote: RemoteTrait """Trait for sending remote control related commands to Q10 devices.""" + map: MapContentTrait + """Trait for fetching the current parsed map (image + rooms).""" + def __init__(self, channel: MqttChannel) -> None: """Initialize the B01Props API.""" self._channel = channel @@ -42,6 +49,7 @@ def __init__(self, channel: MqttChannel) -> None: self.vacuum = VacuumTrait(self.command) self.remote = RemoteTrait(self.command) self.status = StatusTrait() + self.map = MapContentTrait() self._subscribe_task: asyncio.Task[None] | None = None async def start(self) -> None: @@ -65,14 +73,40 @@ async def refresh(self) -> None: await self.command.send(B01_Q10_DP.REQUEST_DPS, params={}) async def _subscribe_loop(self) -> None: - """Persistent loop to listen for status updates.""" - async for decoded_dps in stream_decoded_responses(self._channel): - _LOGGER.debug("Received Q10 status update: %s", decoded_dps) - - # Notify all traits about a new message and each trait will - # only update what fields that it is responsible for. - # More traits can be added here below. - self.status.update_from_dps(decoded_dps) + """Persistent loop dispatching pushed messages to the read-model traits.""" + async for message in self._channel.subscribe_stream(): + self._handle_message(message) + + def _handle_message(self, message: RoborockMessage) -> None: + """Route a single pushed message to the trait responsible for it. + + Map/trace pushes arrive as protocol-301 ``MAP_RESPONSE`` messages (not + DPS), so they are handled separately from the status DPS stream. The Q10 + is entirely push-driven: there is no synchronous get-map request, the + device just publishes its current map (a ``dpRequestDps`` nudges it to). + """ + if self.map.update_from_map_response(message): + return + + try: + decoded_dps = decode_rpc_response(message) + except RoborockException as ex: + _LOGGER.debug("Failed to decode Q10 RPC response: %s: %s", message, ex) + return + + _LOGGER.debug("Received Q10 status update: %s", decoded_dps) + # Notify all traits about a new message and each trait will + # only update what fields that it is responsible for. + # More traits can be added here below. + self.status.update_from_dps(decoded_dps) + + # Feed the map's vector-overlay data points (no-go zones / virtual + # walls) to the map trait so they are decoded as they arrive. + if B01_Q10_DP.RESTRICTED_ZONE_UP in decoded_dps or B01_Q10_DP.VIRTUAL_WALL_UP in decoded_dps: + self.map.load_overlays( + restricted_zone_up=decoded_dps.get(B01_Q10_DP.RESTRICTED_ZONE_UP), + virtual_wall_up=decoded_dps.get(B01_Q10_DP.VIRTUAL_WALL_UP), + ) def create(channel: MqttChannel) -> Q10PropertiesApi: diff --git a/roborock/devices/traits/b01/q10/map.py b/roborock/devices/traits/b01/q10/map.py new file mode 100644 index 00000000..6561f3d6 --- /dev/null +++ b/roborock/devices/traits/b01/q10/map.py @@ -0,0 +1,372 @@ +"""Map content trait for B01 Q10 devices. + +Unlike the v1 / Q7 maps, the Q10 has no synchronous "get map" command, so this +trait is purely push-driven and mirrors the Q10 ``StatusTrait`` contract: + +- The device pushes its current map/path as protocol-301 ``MAP_RESPONSE`` + messages (a ``dpRequestDps`` nudges it to do so). The ``Q10PropertiesApi`` + subscribe loop routes those messages to :meth:`MapContentTrait.update_from_map_response`. +- ``update_from_map_response`` parses the payload, updates the cached fields and + notifies update listeners (register via :meth:`add_update_listener`). +- ``parse_map_content()`` reparses the cached raw bytes without I/O. +- ``image_content``, ``map_data``, ``rooms``, ``path``, ``robot_position`` and + ``raw_api_response`` are readable and reflect the most recently pushed map. + +Unlike the Q7, the Q10 map payload is unencrypted, so no map key is required. +""" + +import io +import logging +from dataclasses import dataclass, field + +from PIL import Image, ImageDraw +from vacuum_map_parser_base.map_data import Area, MapData, Path, Point, Wall + +from roborock.data import RoborockBase +from roborock.devices.traits.common import TraitUpdateListener +from roborock.exceptions import RoborockException +from roborock.map.b01_grid_layers import GridCalibration, GridLayers, solve_calibration +from roborock.map.b01_q10_map_parser import ( + B01Q10MapParser, + B01Q10MapParserConfig, + Q10EraseZone, + Q10Point, + Q10Room, + decompose_layers, + erased_packet, + parse_map_packet, + parse_trace_packet, +) +from roborock.map.b01_q10_overlays import ZONE_TYPE_NO_GO, ZONE_TYPE_NO_MOP, Q10Zone, parse_zone_blob +from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol + +_LOGGER = logging.getLogger(__name__) + +_TRUNCATE_LENGTH = 20 + +# MAP_RESPONSE (protocol 301) payloads start with a 2-byte marker identifying the +# packet kind: a full map (``01 01``) or a live trace/path (``02 01``). +_MAP_PACKET_MARKER = b"\x01\x01" +_TRACE_PACKET_MARKER = b"\x02\x01" + +# World-units-per-pixel candidates for calibration, bracketing the ~13-16 +# measured on live ss07 captures. A dense cleaning path selects the best fit. +_Q10_RESOLUTIONS = [step * 0.5 for step in range(20, 37)] # 10.0 .. 18.0 +# A path needs enough shape to constrain the fit; a few points cannot. +_MIN_CALIBRATION_POINTS = 20 + + +@dataclass +class MapContent(RoborockBase): + """Dataclass representing Q10 map content.""" + + image_content: bytes | None = None + """The rendered image of the map in PNG format.""" + + map_data: MapData | None = None + """Parsed map data (image metadata + room names).""" + + rooms: list[Q10Room] = field(default_factory=list) + """Rooms (segments) reported by the device, with ids and names.""" + + layers: GridLayers | None = None + """Separable map layers (background / wall / floor / per-room) in grid-pixel + space, each renderable to a transparent PNG for frontend compositing.""" + + path: list[Q10Point] = field(default_factory=list) + """Full path of the current cleaning session (oldest point first). + + The robot accumulates this server-side and serves the whole trajectory so + far in one packet, so it is complete even if we connect mid-session. Only + populated while a cleaning session is active.""" + + robot_position: Q10Point | None = None + """Current robot position (the most recent path point), if known.""" + + calibration: GridCalibration | None = None + """World<->pixel transform, solved from a cleaning path (see + :meth:`MapContentTrait.solve_calibration`). Required to place the path, + robot position and vector overlays onto the map raster.""" + + zones: list[Q10Zone] = field(default_factory=list) + """Restricted zones (no-go / no-mop) in world coordinates, from the device's + ``dpRestrictedZoneUp``. See :meth:`MapContentTrait.load_overlays`.""" + + virtual_walls: list[Q10Zone] = field(default_factory=list) + """Virtual walls (line segments) in world coordinates.""" + + erase_zones: list[Q10EraseZone] = field(default_factory=list) + """Erase areas (the app's *Erase* tool) in world coordinates, decoded from the + map packet tail. Once a calibration is available the cells inside them are + blanked from the rendered map and every layer (see :meth:`MapContentTrait`).""" + + raw_api_response: bytes | None = None + """Raw bytes of the map payload from the device (opaque blob for re-parsing).""" + + def __repr__(self) -> str: + img = self.image_content + if img and len(img) > _TRUNCATE_LENGTH: + img = img[: _TRUNCATE_LENGTH - 3] + b"..." + return f"MapContent(image_content={img!r}, rooms={self.rooms!r})" + + +class MapContentTrait(MapContent, TraitUpdateListener): + """Trait holding the most recently pushed parsed map content for Q10 devices. + + The Q10 has no synchronous get-map request; the device pushes map and trace + packets, which the ``Q10PropertiesApi`` subscribe loop feeds into + :meth:`update_from_map_response`. Consumers read the cached fields and/or + register a callback with :meth:`add_update_listener` to be notified when new + map content arrives. + """ + + def __init__( + self, + *, + map_parser_config: B01Q10MapParserConfig | None = None, + ) -> None: + super().__init__() + TraitUpdateListener.__init__(self, logger=_LOGGER) + self._map_parser = B01Q10MapParser(map_parser_config) + + def update_from_map_response(self, message: RoborockMessage) -> bool: + """Update cached map/trace state from a pushed ``MAP_RESPONSE`` message. + + Returns ``True`` if the message was a recognized Q10 map (``01 01``) or + trace (``02 01``) packet (so the caller can stop processing it), and + ``False`` otherwise. Update listeners are notified only when a packet is + parsed successfully. + """ + if message.protocol != RoborockMessageProtocol.MAP_RESPONSE or not message.payload: + return False + marker = message.payload[:2] + if marker == _MAP_PACKET_MARKER: + self.raw_api_response = message.payload + try: + self.parse_map_content() + except RoborockException as ex: + _LOGGER.debug("Failed to parse Q10 map packet: %s", ex) + return True + self._notify_update() + return True + if marker == _TRACE_PACKET_MARKER: + try: + trace = parse_trace_packet(message.payload) + except RoborockException as ex: + _LOGGER.debug("Failed to parse Q10 trace packet: %s", ex) + return True + self.path = trace.points + self.robot_position = trace.robot_position + self._notify_update() + return True + return False + + def parse_map_content(self) -> None: + """Reparse the cached raw map payload without performing any I/O.""" + if self.raw_api_response is None: + raise RoborockException("No map payload available; no map has been pushed yet") + + try: + parsed = self._map_parser.parse(self.raw_api_response) + packet = parse_map_packet(self.raw_api_response) + except RoborockException: + raise + except Exception as ex: + raise RoborockException("Failed to parse Q10 map data") from ex + + if parsed.image_content is None: + raise RoborockException("Failed to render Q10 map image") + + self.image_content = parsed.image_content + self.map_data = parsed.map_data + self.rooms = packet.rooms + self.erase_zones = packet.erase_zones + self.layers = decompose_layers(packet) + if self.calibration is not None: + self._apply_erase(self.calibration) + self._populate_map_data_overlays(self.calibration) + self._place_zones_on_map_data(self.calibration) + + def solve_calibration(self) -> GridCalibration | None: + """Fit and cache the world<->pixel calibration from the current path. + + Requires both a parsed map and a reasonably dense cleaning path (both + arrive as device pushes; the path is only populated during an active + clean). Returns the calibration (also stored on :attr:`calibration`), or + ``None`` if there is no map or the path is too short/featureless to fit. + """ + if self.layers is None or len(self.path) < _MIN_CALIBRATION_POINTS: + return None + calibration = solve_calibration( + self.layers, [(point.x, point.y) for point in self.path], resolutions=_Q10_RESOLUTIONS + ) + if calibration is not None: + self.calibration = calibration + self._apply_erase(calibration) + self._populate_map_data_overlays(calibration) + self._place_zones_on_map_data(calibration) + return calibration + + def load_overlays( + self, + *, + restricted_zone_up: bytes | str | None = None, + virtual_wall_up: bytes | str | None = None, + ) -> None: + """Decode the device's vector-overlay blobs (from the status DPs). + + Pass the raw ``dpRestrictedZoneUp`` / ``dpVirtualWallUp`` values + (``Q10Status.restricted_zone_up`` / ``virtual_wall_up``). Stores them as + world-coordinate :attr:`zones` / :attr:`virtual_walls`, and -- if a + calibration is available -- places them onto ``map_data`` as + ``no_go_areas`` / ``no_mopping_areas`` / ``walls`` in pixel space. + + ``None`` means "data point absent from this update" and leaves the + existing value untouched (a partial status push must not wipe overlays). + An explicit empty blob does clear them. + """ + if restricted_zone_up is not None: + self.zones = parse_zone_blob(restricted_zone_up) + if virtual_wall_up is not None: + self.virtual_walls = parse_zone_blob(virtual_wall_up) + if self.calibration is not None: + self._place_zones_on_map_data(self.calibration) + + def _place_zones_on_map_data(self, calibration: GridCalibration) -> None: + """Convert world-coordinate zones/walls into pixel-space MapData layers.""" + if self.map_data is None: + return + + def to_area(zone: Q10Zone) -> Area | None: + if len(zone.vertices) != 4: + return None # MapData.Area is a quad + pts = [calibration.world_to_pixel(x, y) for x, y in zone.vertices] + return Area(pts[0][0], pts[0][1], pts[1][0], pts[1][1], pts[2][0], pts[2][1], pts[3][0], pts[3][1]) + + no_go = [area for zone in self.zones if zone.type == ZONE_TYPE_NO_GO and (area := to_area(zone))] + no_mop = [area for zone in self.zones if zone.type == ZONE_TYPE_NO_MOP and (area := to_area(zone))] + self.map_data.no_go_areas = no_go or None + self.map_data.no_mopping_areas = no_mop or None + + walls: list[Wall] = [] + for zone in self.virtual_walls: + if len(zone.vertices) >= 2: + (x0, y0), (x1, y1) = zone.vertices[0], zone.vertices[1] + p0 = calibration.world_to_pixel(x0, y0) + p1 = calibration.world_to_pixel(x1, y1) + walls.append(Wall(p0[0], p0[1], p1[0], p1[1])) + self.map_data.walls = walls or None + + # The robot starts a session at its dock, so the path origin is the charger. + if self.path: + cx, cy = calibration.world_to_pixel(self.path[0].x, self.path[0].y) + self.map_data.charger = Point(cx, cy) + + def _erased_cells(self, calibration: GridCalibration) -> set[int]: + """Grid-cell indices covered by the erase zones (axis-aligned bbox fill).""" + if not self.erase_zones or self.layers is None: + return set() + width, height = self.layers.width, self.layers.height + cells: set[int] = set() + for zone in self.erase_zones: + pixels = [calibration.world_to_pixel(x, y) for x, y in zone.vertices] + xs = [p[0] for p in pixels] + ys = [p[1] for p in pixels] + x0, x1 = int(min(xs)), int(max(xs)) + y0, y1 = int(min(ys)), int(max(ys)) + for py in range(max(0, y0), min(height, y1 + 1)): + for px in range(max(0, x0), min(width, x1 + 1)): + cells.add(py * width + px) + return cells + + def _apply_erase(self, calibration: GridCalibration) -> None: + """Blank erase-zone cells out of the rendered map, layers, and ``map_data``. + + The erase rectangles are world-coordinate areas the user marked for removal + (e.g. phantom floor seen through windows). With a calibration we can place + them in pixel space, blank those cells to background, and re-render so the + phantom areas disappear -- matching what the app shows. + """ + if self.layers is None or self.raw_api_response is None: + return + cells = self._erased_cells(calibration) + if not cells: + return + packet = erased_packet(parse_map_packet(self.raw_api_response), cells) + parsed = self._map_parser.parsed_from_packet(packet) + self.image_content = parsed.image_content + self.map_data = parsed.map_data + self.layers = decompose_layers(packet) + + def _populate_map_data_overlays(self, calibration: GridCalibration) -> None: + """Fill MapData.path / vacuum_position in grid-pixel coords. + + Points are stored in grid-pixel space (origin top-left), matching the + Q10's top-down, un-flipped raster so they line up with the rendered image. + """ + if self.map_data is None: + return + pixels = [Point(*calibration.world_to_pixel(point.x, point.y)) for point in self.path] + self.map_data.path = Path(len(pixels), 1, 0, [pixels]) + if self.robot_position is not None: + px, py = calibration.world_to_pixel(self.robot_position.x, self.robot_position.y) + self.map_data.vacuum_position = Point(px, py) + + def render_path_on_map( + self, + *, + line_color: tuple[int, int, int, int] = (235, 64, 52, 255), + position_color: tuple[int, int, int, int] = (255, 211, 0, 255), + ) -> bytes: + """Return the map image (PNG) with the session path + robot position drawn. + + Solves the calibration on demand if not already cached. Raises + :class:`RoborockException` if there is no map, or no calibration can be + fitted (e.g. no cleaning path captured yet). + """ + if self.image_content is None or self.layers is None: + raise RoborockException("No map available; no map has been pushed yet") + calibration = self.calibration or self.solve_calibration() + if calibration is None: + raise RoborockException( + "No calibration available; a cleaning path must be captured (pushed) during a clean" + ) + + scale = self._map_parser.config.map_scale + base = Image.open(io.BytesIO(self.image_content)).convert("RGBA") + + def world_to_image(x: float, y: float) -> tuple[float, float]: + px, py = calibration.world_to_pixel(x, y) + # The ss07 grid renders top-down (no flip), so grid-pixel (px, py) maps + # straight to image space, only upscaled by ``scale``. + return (px * scale, py * scale) + + def to_image(point: Q10Point) -> tuple[float, float]: + return world_to_image(point.x, point.y) + + draw = ImageDraw.Draw(base, "RGBA") + + # Erase zones are applied to the raster itself (cells blanked), so they are + # not drawn here -- the base image already reflects them. + + # No-go (blue) and no-mop (magenta) zones beneath the path. + for zone in self.zones: + if len(zone.vertices) < 3: + continue + polygon = [world_to_image(x, y) for x, y in zone.vertices] + fill = (0, 120, 255, 70) if zone.type == ZONE_TYPE_NO_GO else (255, 0, 200, 70) + outline = (0, 80, 200, 255) if zone.type == ZONE_TYPE_NO_GO else (200, 0, 160, 255) + draw.polygon(polygon, fill=fill, outline=outline) + + if len(self.path) >= 2: + draw.line([to_image(point) for point in self.path], fill=line_color, width=max(1, scale // 2)) + if self.path: # path origin == dock / charger + dx, dy = to_image(self.path[0]) + draw.ellipse([dx - scale, dy - scale, dx + scale, dy + scale], outline=(40, 200, 40, 255), width=2) + if self.robot_position is not None: + cx, cy = to_image(self.robot_position) + radius = scale + draw.ellipse([cx - radius, cy - radius, cx + radius, cy + radius], fill=position_color) + buffer = io.BytesIO() + base.save(buffer, format="PNG") + return buffer.getvalue() diff --git a/roborock/devices/traits/b01/q7/map_content.py b/roborock/devices/traits/b01/q7/map_content.py index afc36d02..a3efeb27 100644 --- a/roborock/devices/traits/b01/q7/map_content.py +++ b/roborock/devices/traits/b01/q7/map_content.py @@ -17,7 +17,8 @@ from roborock.devices.rpc.b01_q7_channel import MapRpcChannel from roborock.devices.traits import Trait from roborock.exceptions import RoborockException -from roborock.map.b01_map_parser import B01MapParser, B01MapParserConfig +from roborock.map.b01_grid_layers import GridCalibration, GridLayers +from roborock.map.b01_map_parser import B01MapParser, B01MapParserConfig, decompose_q7_layers, q7_calibration from roborock.protocols.b01_q7_protocol import B01_Q7_DPS, Q7RequestMessage from roborock.roborock_typing import RoborockB01Q7Methods @@ -36,6 +37,16 @@ class MapContent(RoborockBase): map_data: MapData | None = None """Parsed map data (metadata for points on the map).""" + layers: GridLayers | None = None + """Separable map layers (background / wall / floor) in grid-pixel space. + + Q7's raster has no per-room segmentation, so ``layers.rooms`` is empty (room + ids/names are in the map metadata).""" + + calibration: GridCalibration | None = None + """World<->pixel transform, read directly from the SCMap ``mapHead`` + (``minX``/``minY``/``resolution``); world coordinates are in metres.""" + raw_api_response: bytes | None = None """Raw bytes of the map payload from the device. @@ -98,3 +109,9 @@ async def refresh(self) -> None: self.image_content = parsed_data.image_content self.map_data = parsed_data.map_data self.raw_api_response = raw_payload + try: + self.layers = decompose_q7_layers(raw_payload) + self.calibration = q7_calibration(raw_payload) + except RoborockException: + self.layers = None + self.calibration = None diff --git a/roborock/map/b01_grid_layers.py b/roborock/map/b01_grid_layers.py new file mode 100644 index 00000000..410d60fd --- /dev/null +++ b/roborock/map/b01_grid_layers.py @@ -0,0 +1,253 @@ +"""Device-agnostic decomposition of a B01 occupancy grid into map layers. + +Both the Q10 (custom binary) and Q7 (SCMap protobuf) deliver their map as a +single-byte-per-cell occupancy grid where the cell *value* encodes a semantic +class (background / wall / per-room floor / ...). This module turns such a grid +into separable **layers** a frontend can stack, without knowing the device's +specific value encoding -- the caller supplies a ``classifier`` mapping a cell +value to a class name, plus the room metadata. + +Coordinates here are **grid-pixel** space (origin top-left of the raw grid, before +any rendering flip/scale). Vector overlays in world/robot coordinates (path, +zones, ...) are placed into this same space by the device's calibration. +""" + +import io +from collections.abc import Callable, Iterable +from dataclasses import dataclass, field +from math import ceil + +from PIL import Image + +# Canonical layer class names. Devices map their raw cell values onto these. +LAYER_BACKGROUND = "background" +LAYER_WALL = "wall" +LAYER_FLOOR = "floor" +LAYER_UNKNOWN = "unknown" + +_PNG = "PNG" + + +@dataclass +class RoomLayer: + """A single room (segment) and where its pixels sit in the grid.""" + + id: int + name: str + pixel_value: int + pixel_count: int + bbox: tuple[int, int, int, int] + """Inclusive ``(min_x, min_y, max_x, max_y)`` bounding box in grid pixels.""" + + @property + def center(self) -> tuple[float, float]: + """Center of the bounding box in grid-pixel space (for label placement).""" + min_x, min_y, max_x, max_y = self.bbox + return ((min_x + max_x) / 2, (min_y + max_y) / 2) + + +@dataclass +class GridLayers: + """Separable layers decomposed from a single occupancy grid. + + Holds a reference to the raw ``grid`` plus the classifier, and renders any + layer to a transparent RGBA PNG on demand (so we don't eagerly build a mask + per room). ``class_counts`` reports how many cells fall in each class. + """ + + width: int + height: int + grid: bytes + rooms: list[RoomLayer] + classifier: Callable[[int], str] + class_counts: dict[str, int] = field(default_factory=dict) + flip: bool = True + """Whether display rendering flips the grid top-to-bottom. Devices whose grid + is stored bottom-up (V1/Q7 convention) flip; the Q10 grid is stored top-down, + so it does not. Used as the default for the ``render_*`` methods so every + layer matches the device's composited map orientation.""" + + def cell_class(self, value: int) -> str: + """Classify a single raw cell value into a canonical layer name.""" + return self.classifier(value) + + def render_mask( + self, + predicate: Callable[[int], bool], + color: tuple[int, int, int, int], + *, + scale: int = 1, + flip: bool | None = None, + ) -> bytes: + """Render cells matching ``predicate`` as ``color`` over transparency. + + ``flip`` applies the same top-to-bottom flip the composited map uses so + layers line up pixel-for-pixel (defaults to the device's :attr:`flip`); + ``scale`` nearest-neighbour upsamples. + """ + if flip is None: + flip = self.flip + transparent = (0, 0, 0, 0) + px = bytearray() + for value in self.grid: + px.extend(color if predicate(value) else transparent) + img = Image.frombytes("RGBA", (self.width, self.height), bytes(px)) + if flip: + img = img.transpose(Image.Transpose.FLIP_TOP_BOTTOM) + if scale > 1: + img = img.resize((self.width * scale, self.height * scale), Image.Resampling.NEAREST) + buf = io.BytesIO() + img.save(buf, format=_PNG) + return buf.getvalue() + + def render_class( + self, layer: str, color: tuple[int, int, int, int], *, scale: int = 1, flip: bool | None = None + ) -> bytes: + """Render a whole class layer (e.g. ``"wall"``) to an RGBA PNG.""" + return self.render_mask(lambda v: self.classifier(v) == layer, color, scale=scale, flip=flip) + + def render_room( + self, room_id: int, color: tuple[int, int, int, int], *, scale: int = 1, flip: bool | None = None + ) -> bytes: + """Render a single room's pixels to an RGBA PNG.""" + room = next((r for r in self.rooms if r.id == room_id), None) + if room is None: + raise KeyError(f"No room with id {room_id}") + target = room.pixel_value + return self.render_mask(lambda v: v == target, color, scale=scale, flip=flip) + + +@dataclass +class GridCalibration: + """Affine transform between device world coordinates and grid pixels. + + ``resolution`` is world-units per pixel. The Y axis is flipped between world + and grid space via ``y_sign`` (devices typically have world-Y increasing + upward while the grid's Y increases downward). + """ + + resolution: float + origin_x: float + origin_y: float + y_sign: int = 1 + + def world_to_pixel(self, x: float, y: float) -> tuple[float, float]: + """Map a world ``(x, y)`` to grid-pixel ``(px, py)``.""" + return (x / self.resolution + self.origin_x, self.origin_y - self.y_sign * y / self.resolution) + + def pixel_to_world(self, px: float, py: float) -> tuple[float, float]: + """Map a grid-pixel ``(px, py)`` back to world ``(x, y)``.""" + return ((px - self.origin_x) * self.resolution, self.y_sign * (self.origin_y - py) * self.resolution) + + +def solve_calibration( + layers: GridLayers, + points: list[tuple[float, float]], + *, + resolutions: Iterable[float], + y_signs: Iterable[int] = (1, -1), +) -> GridCalibration | None: + """Fit a :class:`GridCalibration` by overlaying ``points`` onto the floor. + + A cleaning path must lie on floor (not walls/background). For each candidate + resolution and Y orientation we slide the path's pixel bounding box across + the map and keep the placement that maximises on-floor points while + penalising points landing on walls/background. Returns ``None`` if no + placement lands a clear majority of points on floor. + + The search is bounded: the path bbox size fixes how far it can slide, so the + offset range is small. Needs a reasonably dense, shape-rich path (a long + clean) to be well-constrained. + """ + if not points: + return None + w, h = layers.width, layers.height + classify = layers.classifier + # 1 = floor, 2 = wall/background (blocked), 0 = other. Index by cell. + klass = bytes( + 1 if (c := classify(v)) == LAYER_FLOOR else 2 if c in (LAYER_WALL, LAYER_BACKGROUND) else 0 for v in layers.grid + ) + + best: tuple[float, GridCalibration] | None = None + for resolution in resolutions: + if resolution <= 0: + continue + for y_sign in y_signs: + sx = [x / resolution for x, _ in points] + sy = [y_sign * y / resolution for _, y in points] + min_sx, max_sx, min_sy, max_sy = min(sx), max(sx), min(sy), max(sy) + if (max_sx - min_sx) >= w or (max_sy - min_sy) >= h: + continue # path wider/taller than the map at this resolution + pts = list(zip(sx, sy)) + # Slide so every point stays in-bounds: px = px_f + ox in [0, w), py = oy - py_f in [0, h). + for ox in range(ceil(-min_sx), ceil(w - max_sx)): + for oy in range(ceil(max_sy), ceil(h + min_sy)): + on_floor = 0 + blocked = 0 + for px_f, py_f in pts: + cell = int(oy - py_f) * w + int(px_f + ox) + k = klass[cell] + if k == 1: + on_floor += 1 + elif k == 2: + blocked += 1 + score = on_floor - 1.5 * blocked + if best is None or score > best[0]: + best = (score, GridCalibration(float(resolution), float(ox), float(oy), y_sign)) + + if best is None or best[0] < len(points) * 0.5: + return None + return best[1] + + +def decompose_grid( + width: int, + height: int, + grid: bytes, + rooms: Iterable[tuple[int, str, int, int]], + classifier: Callable[[int], str], + *, + flip: bool = True, +) -> GridLayers: + """Build :class:`GridLayers` from a grid + room records + a classifier. + + ``rooms`` items are ``(id, name, pixel_value, pixel_count)`` tuples. Per-room + bounding boxes are computed in one pass over the grid. ``flip`` records the + device's display orientation (see :attr:`GridLayers.flip`). + """ + room_meta = list(rooms) + bboxes: dict[int, list[int]] = {pv: [width, height, -1, -1] for (_, _, pv, _) in room_meta} + counts: dict[str, int] = {} + for index, value in enumerate(grid): + cls = classifier(value) + counts[cls] = counts.get(cls, 0) + 1 + box = bboxes.get(value) + if box is not None: + x = index % width + y = index // width + if x < box[0]: + box[0] = x + if y < box[1]: + box[1] = y + if x > box[2]: + box[2] = x + if y > box[3]: + box[3] = y + + room_layers: list[RoomLayer] = [] + for room_id, name, pixel_value, pixel_count in room_meta: + box = bboxes[pixel_value] + bbox = (box[0], box[1], box[2], box[3]) if box[2] >= 0 else (0, 0, 0, 0) + room_layers.append( + RoomLayer(id=room_id, name=name, pixel_value=pixel_value, pixel_count=pixel_count, bbox=bbox) + ) + + return GridLayers( + width=width, + height=height, + grid=grid, + rooms=room_layers, + classifier=classifier, + class_counts=counts, + flip=flip, + ) diff --git a/roborock/map/b01_map_parser.py b/roborock/map/b01_map_parser.py index b57912e4..30b5887e 100644 --- a/roborock/map/b01_map_parser.py +++ b/roborock/map/b01_map_parser.py @@ -15,11 +15,67 @@ from roborock.exceptions import RoborockException from roborock.map.proto.b01_scmap_pb2 import RobotMap # type: ignore[attr-defined] +from .b01_grid_layers import ( + LAYER_BACKGROUND, + LAYER_FLOOR, + LAYER_WALL, + GridCalibration, + GridLayers, + decompose_grid, +) from .map_parser import ParsedMapData _MAP_FILE_FORMAT = "PNG" +# The Q7 occupancy grid encodes only these classes (no per-room segmentation in +# the raster -- room ids/names live in the protobuf metadata, not the pixels). +_Q7_WALL_VALUE = 127 +_Q7_FLOOR_VALUE = 128 + + +def classify_q7_cell(value: int) -> str: + """Map a Q7 SCMap grid cell value to a canonical layer class.""" + if value == _Q7_WALL_VALUE: + return LAYER_WALL + if value == _Q7_FLOOR_VALUE: + return LAYER_FLOOR + return LAYER_BACKGROUND # 0 = outside / unknown + + +def decompose_q7_layers(payload: bytes) -> GridLayers: + """Split an inflated Q7 SCMap into background / wall / floor layers. + + Q7 has no per-room raster, so ``GridLayers.rooms`` is empty; room ids/names + are available separately via the map metadata. Reuses the same device-agnostic + decomposition as the Q10. + """ + parsed = _parse_scmap_payload(payload) + size_x, size_y, grid = _extract_grid(parsed) + return decompose_grid(size_x, size_y, grid, [], classify_q7_cell) + + +def q7_calibration(payload: bytes) -> GridCalibration | None: + """Build a world<->pixel calibration straight from the Q7 ``mapHead``. + + Unlike the Q10 (whose packet carries no calibration), the Q7 SCMap header + provides ``minX``/``minY``/``resolution`` directly, so no path fitting is + needed. World coordinates are in metres; resolution is metres-per-pixel. + """ + head = _parse_scmap_payload(payload).mapHead + if not head.HasField("resolution") or head.resolution <= 0 or not head.HasField("sizeY"): + return None + resolution = head.resolution + min_x = head.minX if head.HasField("minX") else 0.0 + min_y = head.minY if head.HasField("minY") else 0.0 + return GridCalibration( + resolution=resolution, + origin_x=-min_x / resolution, + origin_y=(head.sizeY - 1) + min_y / resolution, + y_sign=1, + ) + + @dataclass class B01MapParserConfig: """Configuration for the B01/Q7 map parser.""" diff --git a/roborock/map/b01_q10_map_parser.py b/roborock/map/b01_q10_map_parser.py new file mode 100644 index 00000000..09785885 --- /dev/null +++ b/roborock/map/b01_q10_map_parser.py @@ -0,0 +1,446 @@ +"""Parser for Roborock Q10 (B01/ss07) map packets. + +Q10 devices deliver map data as a protocol-301 ``MAP_RESPONSE`` message (pushed a +few seconds after a ``dpRequestDps`` request). Unlike the Q7 ``SCMap`` protobuf +format, the Q10 uses a custom, unencrypted binary packet: + +- ``01 01`` marker, then a ``u32be`` map id and a ``u16le`` grid width. +- A header field at offset 27 (``u16be``) giving the compressed layout length. +- An LZ4-block-compressed occupancy grid starting at offset 29. Once inflated it + is ``width * height`` cells of grid data followed by room metadata records. +- Room metadata begins with ``01 `` followed by fixed 47-byte + records (id, hints, ascii name). Each room paints cells with value + ``room_id * 4`` in the grid. + +The packet layout was confirmed against live Q10 captures. The format +documentation that informed this clean-room implementation comes from the +``roborock-qseries-map-bridge`` project (GPL-3.0-or-later): +https://github.com/v1b3c0d3x3r/roborock-qseries-map-bridge +""" + +import colorsys +import io +from dataclasses import dataclass, field, replace + +from PIL import Image +from vacuum_map_parser_base.config.image_config import ImageConfig +from vacuum_map_parser_base.map_data import ImageData, MapData + +from roborock.exceptions import RoborockException + +from .b01_grid_layers import ( + LAYER_BACKGROUND, + LAYER_FLOOR, + LAYER_UNKNOWN, + LAYER_WALL, + GridLayers, + decompose_grid, +) +from .map_parser import ParsedMapData + +_MAP_FILE_FORMAT = "PNG" + +# Semantic raster classes, confirmed against real ss07 captures (rendered and +# eyeballed): 243 is the background outside the home (~half the grid), 240 is +# scanned floor not yet assigned to a room, other values >= 240 are walls, and +# 0 < value < 240 are per-room floor cells (value == room_id * 4). +_BACKGROUND_VALUE = 243 +_UNSEGMENTED_FLOOR_VALUE = 240 + + +def classify_q10_cell(value: int) -> str: + """Map a Q10 grid cell value to a canonical layer class.""" + if value == 0: + return LAYER_UNKNOWN + if value == _BACKGROUND_VALUE: + return LAYER_BACKGROUND + if value == _UNSEGMENTED_FLOOR_VALUE: + return LAYER_FLOOR + if value >= _WALL_THRESHOLD: + return LAYER_WALL + return LAYER_FLOOR + + +def decompose_layers(packet: "Q10MapPacket") -> GridLayers: + """Split a parsed Q10 map packet into separable grid-pixel layers.""" + rooms = [(room.id, room.name, room.pixel_value, room.pixel_count) for room in packet.rooms] + # The ss07 grid is stored top-down (row 0 = top), so no display flip is applied. + return decompose_grid(packet.width, packet.height, packet.grid, rooms, classify_q10_cell, flip=False) + + +MAP_PACKET_MARKER = b"\x01\x01" +TRACE_PACKET_MARKER = b"\x02\x01" + +_MAP_ID_OFFSET = 2 +_WIDTH_OFFSET = 8 +_COMPRESSED_LAYOUT_LENGTH_OFFSET = 27 +_LAYOUT_COMPRESSED_OFFSET = 29 +_ROOM_RECORD_LENGTH = 47 +_ROOM_NAME_LENGTH_OFFSET = 26 +_MAX_ROOMS = 32 +# Sanity bound for the erase-zone vector section's vertices-per-polygon field. +_MAX_ERASE_ZONE_VERTICES = 16 + +# Grid cell values >= this are walls / borders rather than room segments. +_WALL_THRESHOLD = 240 + + +@dataclass +class Q10Room: + """A room (segment) described in a Q10 map packet.""" + + id: int + raw_name: str + pixel_value: int + pixel_count: int + + @property + def name(self) -> str: + """User friendly room name (firmware ``rr_`` defaults are normalized).""" + return self.raw_name.removeprefix("rr_").replace("_", " ").strip().title() + + +@dataclass +class Q10EraseZone: + """A user-drawn "erase" area (polygon) carried in the map packet. + + These are the app's *Erase* tool rectangles -- regions the user marked to be + removed from the map (e.g. phantom floor the lidar mapped through windows). + Coordinates are world units (millimetres), same frame as the path/zones. + + Confirmed by a controlled diff: removing the two erase zones on a live device + dropped this section's count from 2 to 0 while the grid and the trailing + raster were byte-identical. (Earlier revisions misidentified this section as + "carpets"; it is the erase-zone list.) + """ + + vertices: list[tuple[int, int]] = field(default_factory=list) + + +@dataclass +class Q10MapPacket: + """Decoded contents of a Q10 ``01 01`` map packet.""" + + map_id: int + width: int + height: int + grid: bytes + rooms: list[Q10Room] = field(default_factory=list) + erase_zones: list[Q10EraseZone] = field(default_factory=list) + """Erase areas decoded from the packet tail (world coordinates).""" + + +@dataclass +class Q10Point: + """A single point in Q10 map/trace coordinate space.""" + + x: int + y: int + + +@dataclass +class Q10TracePacket: + """Decoded contents of a Q10 ``02 01`` cleaning-path packet. + + The robot accumulates the **full path of the current cleaning session** and + serves it in a single packet: ``points`` holds the whole trajectory so far + (oldest first), growing as the robot cleans. This was confirmed live -- a + corridor run produced packets of 1, then 3, then 15 points, each a strict + superset describing the path travelled. Because the robot keeps the path + server-side, a client that connects mid-session still receives the complete + path (this is how the app shows the trail even after a cold launch). + + The robot only emits these while a session is active, so an idle/docked robot + will not produce them. The most recent point is the current robot position. + """ + + points: list[Q10Point] = field(default_factory=list) + sequence: int = 0 + """Session counter (byte 3); increments per cleaning session, tracking the + device clean count. Not a per-packet sequence.""" + + @property + def robot_position(self) -> Q10Point | None: + """The current robot position (the most recent point).""" + return self.points[-1] if self.points else None + + +# Trace packet (``02 01``): a 10-byte header followed by big-endian int16 (x, y) +# point pairs forming the accumulated session path. Header layout confirmed +# against live ss07 captures: byte 3 is a session counter (tracks the device +# clean count); bytes 8-9 are a u16be point count minus one (verified: a 15-point +# packet carried 0x000e == 14). The parser reads all 4-byte pairs in the body +# rather than trusting the count field, so a truncated tail can't desync it. +# NOTE: the format documented by roborock-qseries-map-bridge (18-byte header) +# did not match this firmware -- this 10-byte layout is what the device sent. +_TRACE_HEADER_LENGTH = 10 +_TRACE_SEQUENCE_OFFSET = 3 + + +def is_map_packet(payload: bytes) -> bool: + """Return True if the payload is a Q10 full-map (``01 01``) packet.""" + return payload[:2] == MAP_PACKET_MARKER + + +def is_trace_packet(payload: bytes) -> bool: + """Return True if the payload is a Q10 live trace (``02 01``) packet.""" + return payload[:2] == TRACE_PACKET_MARKER + + +def parse_trace_packet(payload: bytes) -> Q10TracePacket: + """Parse a Q10 ``02 01`` trace packet into path points + robot position.""" + if not is_trace_packet(payload): + raise RoborockException("Payload is not a Q10 trace packet") + if len(payload) < _TRACE_HEADER_LENGTH: + raise RoborockException("Q10 trace packet is shorter than its header") + body = payload[_TRACE_HEADER_LENGTH:] + if len(body) % 4: + raise RoborockException("Q10 trace points are not 4-byte (x, y) pairs") + + points = [ + Q10Point( + x=int.from_bytes(body[offset : offset + 2], "big", signed=True), + y=int.from_bytes(body[offset + 2 : offset + 4], "big", signed=True), + ) + for offset in range(0, len(body), 4) + ] + return Q10TracePacket(points=points, sequence=payload[_TRACE_SEQUENCE_OFFSET]) + + +def lz4_block_decompress(data: bytes) -> bytes: + """Decompress a raw LZ4 *block* (no frame header). + + The Q10 map grid is stored as a single LZ4 block. This implements the + standard LZ4 block format so we don't add a native dependency. + """ + index = 0 + output = bytearray() + + def read_length(value: int) -> int: + nonlocal index + if value != 0x0F: + return value + while True: + if index >= len(data): + raise RoborockException("Truncated LZ4 block while reading length") + part = data[index] + index += 1 + value += part + if part != 0xFF: + return value + + while True: + if index >= len(data): + raise RoborockException("Truncated LZ4 block while reading token") + token = data[index] + index += 1 + + literal_length = read_length((token >> 4) & 0x0F) + end = index + literal_length + if end > len(data): + raise RoborockException("Truncated LZ4 block while reading literals") + output.extend(data[index:end]) + index = end + + if index == len(data): + return bytes(output) + if index + 2 > len(data): + raise RoborockException("Truncated LZ4 block while reading match offset") + + offset = data[index] | (data[index + 1] << 8) + index += 2 + if offset == 0 or offset > len(output): + raise RoborockException("Invalid LZ4 back-reference offset") + + match_length = read_length(token & 0x0F) + 4 + for _ in range(match_length): + output.append(output[-offset]) + + +def _infer_layout(decoded: bytes, width: int) -> tuple[int, bytes, bytes]: + """Split the inflated layout into (height, grid, room_data). + + The grid is ``width * height`` cells; the remaining bytes are room records + introduced by an ``01 `` marker. The room count is unknown up + front, so we search for the split that makes the grid rectangular and lines + up with the marker. + """ + for room_count in range(0, _MAX_ROOMS + 1): + room_data_length = 2 + room_count * _ROOM_RECORD_LENGTH + area = len(decoded) - room_data_length + if area <= 0 or area % width: + continue + room_data = decoded[area:] + if room_data[0] == 1 and room_data[1] == room_count: + return area // width, decoded[:area], room_data + raise RoborockException("Could not infer Q10 layout dimensions / room records") + + +def _parse_rooms(room_data: bytes, grid: bytes) -> list[Q10Room]: + rooms: list[Q10Room] = [] + room_count = room_data[1] + for index in range(room_count): + start = 2 + index * _ROOM_RECORD_LENGTH + record = room_data[start : start + _ROOM_RECORD_LENGTH] + room_id = int.from_bytes(record[0:2], "big") + name_length = record[_ROOM_NAME_LENGTH_OFFSET] + raw_name = record[27 : 27 + name_length].decode("utf-8", errors="replace") + pixel_value = (room_id * 4) & 0xFF + rooms.append( + Q10Room( + id=room_id, + raw_name=raw_name, + pixel_value=pixel_value, + pixel_count=grid.count(pixel_value), + ) + ) + return rooms + + +def parse_map_packet(payload: bytes) -> Q10MapPacket: + """Parse a Q10 ``01 01`` map packet into grid + room metadata.""" + if len(payload) < _LAYOUT_COMPRESSED_OFFSET or not is_map_packet(payload): + raise RoborockException("Payload is not a Q10 map packet") + + map_id = int.from_bytes(payload[_MAP_ID_OFFSET : _MAP_ID_OFFSET + 4], "big") + width = int.from_bytes(payload[_WIDTH_OFFSET : _WIDTH_OFFSET + 2], "little") + if width <= 0: + raise RoborockException("Q10 map packet has invalid width") + + compressed_length = int.from_bytes( + payload[_COMPRESSED_LAYOUT_LENGTH_OFFSET : _COMPRESSED_LAYOUT_LENGTH_OFFSET + 2], "big" + ) + layout_end = _LAYOUT_COMPRESSED_OFFSET + compressed_length + if compressed_length <= 0 or layout_end > len(payload): + raise RoborockException("Q10 map packet has invalid layout block length") + + decoded = lz4_block_decompress(payload[_LAYOUT_COMPRESSED_OFFSET:layout_end]) + height, grid, room_data = _infer_layout(decoded, width) + rooms = _parse_rooms(room_data, grid) + erase_zones = _parse_erase_zones(payload[layout_end:]) + return Q10MapPacket(map_id=map_id, width=width, height=height, grid=grid, rooms=rooms, erase_zones=erase_zones) + + +def _parse_erase_zones(tail: bytes) -> list[Q10EraseZone]: + """Decode erase areas from the bytes after the compressed grid block. + + The tail begins with a vector section ``[count: u8][vertices_per: u8]`` then + ``count`` polygons of ``vertices_per`` int16-BE (x, y) pairs (axis-aligned + rectangles in practice). Identified by a controlled diff on a live ss07 + device: deleting the two app *Erase* zones dropped ``count`` 2->0 with the + rest of the packet byte-identical. The remaining tail (a run-length raster + + signature) is unrelated to erase and is not decoded here. + """ + if len(tail) < 2: + return [] + count = tail[0] + vertices_per = tail[1] + if count == 0 or not 1 <= vertices_per <= _MAX_ERASE_ZONE_VERTICES: + return [] + + erase_zones: list[Q10EraseZone] = [] + offset = 2 + for _ in range(count): + end = offset + vertices_per * 4 + if end > len(tail): + break + vertices = [ + ( + int.from_bytes(tail[offset + j * 4 : offset + j * 4 + 2], "big", signed=True), + int.from_bytes(tail[offset + j * 4 + 2 : offset + j * 4 + 4], "big", signed=True), + ) + for j in range(vertices_per) + ] + erase_zones.append(Q10EraseZone(vertices=vertices)) + offset = end + return erase_zones + + +def erased_packet(packet: "Q10MapPacket", cells: set[int]) -> "Q10MapPacket": + """Return a copy of ``packet`` with ``cells`` (grid indices) set to background. + + Used to apply the app's erase zones: cells inside an erase rectangle are blanked + to the background class so they drop out of the rendered map and every layer. + """ + if not cells: + return packet + grid = bytearray(packet.grid) + for cell in cells: + if 0 <= cell < len(grid): + grid[cell] = _BACKGROUND_VALUE + return replace(packet, grid=bytes(grid)) + + +@dataclass +class B01Q10MapParserConfig: + """Configuration for the Q10 map parser.""" + + map_scale: int = 4 + """Scale factor for the rendered map image.""" + + +class B01Q10MapParser: + """Decoder/renderer for Q10 ``MAP_RESPONSE`` (protocol 301) payloads.""" + + def __init__(self, config: B01Q10MapParserConfig | None = None) -> None: + self._config = config or B01Q10MapParserConfig() + + @property + def config(self) -> B01Q10MapParserConfig: + """The parser configuration (image scale, ...).""" + return self._config + + def parse(self, payload: bytes) -> ParsedMapData: + """Parse a raw Q10 map packet into a rendered PNG + ``MapData``.""" + return self.parsed_from_packet(parse_map_packet(payload)) + + def parsed_from_packet(self, packet: Q10MapPacket) -> ParsedMapData: + """Render a (possibly erase-modified) packet into a PNG + ``MapData``.""" + image = self._render(packet) + + map_data = MapData() + map_data.image = ImageData( + size=packet.width * packet.height, + top=0, + left=0, + height=packet.height, + width=packet.width, + image_config=ImageConfig(scale=self._config.map_scale), + data=image, + img_transformation=lambda p: p, + ) + room_names = {room.id: room.name for room in packet.rooms} + if room_names: + map_data.additional_parameters["room_names"] = room_names + + image_bytes = io.BytesIO() + image.save(image_bytes, format=_MAP_FILE_FORMAT) + return ParsedMapData(image_content=image_bytes.getvalue(), map_data=map_data) + + def _render(self, packet: Q10MapPacket) -> Image.Image: + """Render the Q10 grid: rooms get distinct colors, walls white, rest dark.""" + palette = _build_palette(packet.grid) + rgb = bytearray() + for value in packet.grid: + rgb.extend(palette[value]) + img = Image.frombytes("RGB", (packet.width, packet.height), bytes(rgb)) + # The ss07 grid is stored top-down (row 0 = top of the home), so it is + # rendered as-is -- unlike the V1/Q7 convention, no vertical flip. + scale = self._config.map_scale + if scale > 1: + img = img.resize((packet.width * scale, packet.height * scale), resample=Image.Resampling.NEAREST) + return img + + +def _build_palette(grid: bytes) -> list[tuple[int, int, int]]: + """Map each grid value to an RGB color (rooms distinct, walls white).""" + palette: list[tuple[int, int, int]] = [(28, 30, 38)] * 256 # default: unknown/outside + room_values = sorted({v for v in set(grid) if 0 < v < _WALL_THRESHOLD}) + for index, value in enumerate(room_values): + hue = (index * 0.139) % 1.0 + r, g, b = colorsys.hsv_to_rgb(hue, 0.5, 0.95) + palette[value] = (int(r * 255), int(g * 255), int(b * 255)) + for value in range(_WALL_THRESHOLD, 256): + palette[value] = (235, 235, 240) # walls / borders + palette[0] = (28, 30, 38) + return palette diff --git a/roborock/map/b01_q10_overlays.py b/roborock/map/b01_q10_overlays.py new file mode 100644 index 00000000..e371fdaa --- /dev/null +++ b/roborock/map/b01_q10_overlays.py @@ -0,0 +1,89 @@ +"""Decoders for Q10 (B01/ss07) map vector overlays. + +No-go zones, no-mop zones, virtual walls and zoned-clean areas are not part of +the map raster; the device reports them as base64-encoded blobs in separate data +points (``dpRestrictedZoneUp`` 55, ``dpVirtualWallUp`` 57, ``dpZonedUp`` 59). + +The blob format was reverse-engineered from a live ss07 (confirmed against 7 +real no-go zones): + + [version: u8][count: u8] then ``count`` fixed-size records, each: + [type: u8][vertex_count: u8] then vertex_count (x, y) int16-BE pairs, + zero-padded to the record size. + +Coordinates are in the device's world units (the same space as the cleaning +path), so a :class:`~roborock.map.b01_grid_layers.GridCalibration` maps them to +map pixels. ``type`` distinguishes the restriction kind (0 = no-go, 3 = no-mop +observed); it is preserved verbatim so callers can route polygons to the right +``MapData`` layer. +""" + +import base64 +from dataclasses import dataclass, field + +_DEFAULT_RECORD_SIZE = 38 # 2-byte record header + up to 9 (x, y) int16 pairs + + +@dataclass +class Q10Zone: + """A polygon overlay (no-go / no-mop / virtual wall) in world coordinates.""" + + type: int + vertices: list[tuple[int, int]] = field(default_factory=list) + + +def _as_bytes(data: bytes | str | None) -> bytes: + if data is None: + return b"" + if isinstance(data, bytes): + return data + try: + return base64.b64decode(data + "=" * (-len(data) % 4)) + except (ValueError, base64.binascii.Error): # type: ignore[attr-defined] + return b"" + + +def parse_zone_blob(data: bytes | str | None) -> list[Q10Zone]: + """Decode a Q10 zone/wall overlay blob into a list of :class:`Q10Zone`. + + Accepts the raw bytes or the base64 string straight from the data point. + Returns ``[]`` for empty/absent/unparsable blobs (the device sends a single + ``0x00`` byte when there are none). + """ + raw = _as_bytes(data) + if len(raw) < 2: + return [] + count = raw[1] + if count <= 0: + return [] + + body = raw[2:] + record_size = len(body) // count if count and len(body) % count == 0 else _DEFAULT_RECORD_SIZE + if record_size < 2: + return [] + + zones: list[Q10Zone] = [] + for index in range(count): + record = body[index * record_size : (index + 1) * record_size] + if len(record) < 2: + break + zone_type = record[0] + vertex_count = record[1] + needed = 2 + vertex_count * 4 + if needed > len(record): + continue # malformed record; skip rather than misread padding + vertices = [ + ( + int.from_bytes(record[2 + j * 4 : 4 + j * 4], "big", signed=True), + int.from_bytes(record[4 + j * 4 : 6 + j * 4], "big", signed=True), + ) + for j in range(vertex_count) + ] + zones.append(Q10Zone(type=zone_type, vertices=vertices)) + return zones + + +# Observed ``type`` values. 0 = no-go (vacuum forbidden), 3 = no-mop. Others are +# surfaced verbatim on Q10Zone.type for callers that recognise them. +ZONE_TYPE_NO_GO = 0 +ZONE_TYPE_NO_MOP = 3 diff --git a/tests/devices/traits/b01/q10/test_map.py b/tests/devices/traits/b01/q10/test_map.py new file mode 100644 index 00000000..e026b3ba --- /dev/null +++ b/tests/devices/traits/b01/q10/test_map.py @@ -0,0 +1,278 @@ +"""Tests for the Q10 B01 map content trait. + +The Q10 map API is push-driven: the device publishes ``MAP_RESPONSE`` messages +and the trait updates its cached state from them via ``update_from_map_response`` +(there is no synchronous get-map request). +""" + +import asyncio +import io +from collections.abc import AsyncGenerator +from pathlib import Path +from unittest.mock import AsyncMock, Mock + +import pytest +from PIL import Image + +from roborock.devices.traits.b01.q10 import Q10PropertiesApi, create +from roborock.devices.traits.b01.q10.map import MapContentTrait +from roborock.exceptions import RoborockException +from roborock.map.b01_grid_layers import GridCalibration +from roborock.map.b01_q10_map_parser import Q10EraseZone, Q10Point +from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol + +FIXTURE = Path("tests/map/testdata/b01_q10_map.bin") +TRACE_FIXTURE = Path("tests/map/testdata/b01_q10_trace.bin") + + +def _map_message( + payload: bytes, protocol: RoborockMessageProtocol = RoborockMessageProtocol.MAP_RESPONSE +) -> RoborockMessage: + return RoborockMessage(protocol=protocol, payload=payload, version=b"B01") + + +def test_update_from_map_response_populates_image_and_rooms() -> None: + """A pushed 01 01 map packet populates the image, rooms and map data.""" + payload = FIXTURE.read_bytes() + trait = MapContentTrait() + updates: list[None] = [] + trait.add_update_listener(lambda: updates.append(None)) + + assert trait.update_from_map_response(_map_message(payload)) is True + + assert trait.raw_api_response == payload + assert trait.image_content is not None + assert trait.image_content[:8] == b"\x89PNG\r\n\x1a\n" + assert {room.id: room.name for room in trait.rooms} == {2: "Living Room", 3: "Bedroom"} + assert trait.map_data is not None + assert len(updates) == 1 + + +def test_update_from_map_response_populates_path_and_position() -> None: + """A pushed 02 01 trace packet populates the path and robot position.""" + trait = MapContentTrait() + updates: list[None] = [] + trait.add_update_listener(lambda: updates.append(None)) + + assert trait.update_from_map_response(_map_message(TRACE_FIXTURE.read_bytes())) is True + + assert [(p.x, p.y) for p in trait.path] == [(169, 0)] + assert trait.robot_position is not None + assert (trait.robot_position.x, trait.robot_position.y) == (169, 0) + assert len(updates) == 1 + + +def test_update_from_map_response_ignores_non_map_messages() -> None: + """Non-MAP_RESPONSE messages are left for the status path to handle.""" + trait = MapContentTrait() + updates: list[None] = [] + trait.add_update_listener(lambda: updates.append(None)) + + rpc = _map_message(b"\x01\x01whatever", protocol=RoborockMessageProtocol.RPC_RESPONSE) + assert trait.update_from_map_response(rpc) is False + + # An unrecognized MAP_RESPONSE marker is also not consumed. + assert trait.update_from_map_response(_map_message(b"\x09\x09junk")) is False + + assert trait.image_content is None + assert not trait.path + assert not updates + + +def test_parse_without_data_raises() -> None: + trait = MapContentTrait() + with pytest.raises(RoborockException, match="No map payload available"): + trait.parse_map_content() + + +# --- Integration through the Q10PropertiesApi subscribe loop ----------------- + + +@pytest.fixture +def message_queue() -> asyncio.Queue[RoborockMessage]: + return asyncio.Queue() + + +@pytest.fixture +def mock_channel(message_queue: asyncio.Queue[RoborockMessage]) -> AsyncMock: + async def mock_stream() -> AsyncGenerator[RoborockMessage, None]: + while True: + yield await message_queue.get() + + channel = AsyncMock() + channel.subscribe_stream = Mock(return_value=mock_stream()) + return channel + + +@pytest.fixture +async def q10_api(mock_channel: AsyncMock) -> AsyncGenerator[Q10PropertiesApi, None]: + api = create(mock_channel) + await api.start() + yield api + await api.close() + + +async def _wait_for(predicate, timeout: float = 2.0) -> None: + async with asyncio.timeout(timeout): + while not predicate(): + await asyncio.sleep(0.01) + + +async def test_subscribe_loop_routes_map_push( + q10_api: Q10PropertiesApi, + message_queue: asyncio.Queue[RoborockMessage], +) -> None: + """A map pushed onto the stream is routed to the map trait by the loop.""" + assert q10_api.map.image_content is None + + message_queue.put_nowait(_map_message(FIXTURE.read_bytes())) + + await _wait_for(lambda: q10_api.map.image_content is not None) + assert {room.id: room.name for room in q10_api.map.rooms} == {2: "Living Room", 3: "Bedroom"} + + +async def test_subscribe_loop_routes_trace_push( + q10_api: Q10PropertiesApi, + message_queue: asyncio.Queue[RoborockMessage], +) -> None: + """A trace pushed onto the stream is routed to the map trait by the loop.""" + assert not q10_api.map.path + + message_queue.put_nowait(_map_message(TRACE_FIXTURE.read_bytes())) + + await _wait_for(lambda: bool(q10_api.map.path)) + assert q10_api.map.robot_position is not None + + +# --- Layers / calibration / rendering ---------------------------------------- + + +def _trait_with_map() -> MapContentTrait: + """A trait with a map already pushed into it.""" + trait = MapContentTrait() + trait.update_from_map_response(_map_message(FIXTURE.read_bytes())) + return trait + + +def test_map_push_populates_layers() -> None: + """A pushed map is also decomposed into separable layers.""" + trait = _trait_with_map() + assert trait.layers is not None + assert trait.layers.class_counts.get("floor") == 26 + assert {room.id for room in trait.layers.rooms} == {2, 3} + + +def test_solve_calibration_needs_map_and_dense_path() -> None: + """No map or too-short a path -> no calibration.""" + trait = MapContentTrait() + trait.path = [Q10Point(i, 0) for i in range(30)] + assert trait.solve_calibration() is None # no layers yet + + +def test_render_path_on_map_requires_map() -> None: + trait = MapContentTrait() + with pytest.raises(RoborockException, match="No map available"): + trait.render_path_on_map() + + +def test_render_path_on_map_draws_position() -> None: + """With a calibration set, the robot position is drawn at the mapped pixel.""" + trait = _trait_with_map() + # identity-ish calibration: world (x,y) -> pixel (x, 5 - y) in the 8x6 grid. + trait.calibration = GridCalibration(resolution=1.0, origin_x=0.0, origin_y=5.0, y_sign=1) + trait.path = [Q10Point(1, 2), Q10Point(3, 2)] + # world (3, 2) -> grid pixel (3, 3); the ss07 grid renders top-down (no flip), + # so that maps straight to image (12, 12) at scale 4. + trait.robot_position = Q10Point(3, 2) + png = trait.render_path_on_map(position_color=(255, 211, 0, 255)) + img = Image.open(io.BytesIO(png)).convert("RGBA") + assert img.size == (8 * 4, 6 * 4) + assert img.getpixel((12, 12)) == (255, 211, 0, 255) + + +def test_parse_map_content_preserves_path_overlays_after_calibration() -> None: + """Reparsing a calibrated map keeps path and vacuum position on MapData.""" + trait = _trait_with_map() + trait.calibration = GridCalibration(resolution=1.0, origin_x=0.0, origin_y=5.0, y_sign=1) + trait.path = [Q10Point(1, 2), Q10Point(3, 2)] + trait.robot_position = Q10Point(3, 2) + + trait.parse_map_content() + + assert trait.map_data is not None + assert trait.map_data.path is not None + assert trait.map_data.vacuum_position is not None + assert (trait.map_data.vacuum_position.x, trait.map_data.vacuum_position.y) == (3.0, 3.0) + + +def test_load_overlays_places_zones_with_calibration() -> None: + """Decoded no-go / no-mop zones become pixel-space MapData areas.""" + trait = _trait_with_map() + trait.calibration = GridCalibration(resolution=1.0, origin_x=0.0, origin_y=5.0, y_sign=1) + trait.path = [Q10Point(1, 1)] # path origin -> charger + + def rect(zone_type: int, corners: list[tuple[int, int]]) -> bytes: + out = bytes([zone_type, len(corners)]) + for x, y in corners: + out += int.to_bytes(x & 0xFFFF, 2, "big") + int.to_bytes(y & 0xFFFF, 2, "big") + return out.ljust(18, b"\x00") + + blob = bytes([1, 2]) + rect(0, [(0, 0), (4, 0), (4, 4), (0, 4)]) + rect(3, [(1, 1), (2, 1), (2, 2), (1, 2)]) + trait.load_overlays(restricted_zone_up=blob) + + assert len(trait.zones) == 2 + assert trait.map_data is not None + assert len(trait.map_data.no_go_areas or []) == 1 + assert len(trait.map_data.no_mopping_areas or []) == 1 + # charger = path origin in pixels: (1, 5-1) = (1, 4) + assert trait.map_data.charger is not None + assert (trait.map_data.charger.x, trait.map_data.charger.y) == (1.0, 4.0) + + +def test_apply_erase_blanks_cells_with_calibration() -> None: + """With a calibration, erase-zone cells are blanked from the layers + image.""" + trait = _trait_with_map() + assert trait.layers is not None + before_floor = trait.layers.class_counts.get("floor") + before_image = trait.image_content + assert before_floor and before_floor > 0 + + # identity-ish calibration: world (x, y) -> pixel (x, 5 - y) over the 8x6 grid. + trait.calibration = GridCalibration(resolution=1.0, origin_x=0.0, origin_y=5.0, y_sign=1) + # A rectangle covering the whole grid in world coords erases every cell. + trait.erase_zones = [Q10EraseZone(vertices=[(0, 0), (7, 0), (7, 5), (0, 5)])] + trait._apply_erase(trait.calibration) + + assert trait.layers.class_counts.get("floor", 0) == 0 # all floor erased + assert trait.image_content != before_image # re-rendered + + +def test_apply_erase_partial_rectangle() -> None: + """An erase rectangle only blanks the cells it covers, leaving the rest.""" + trait = _trait_with_map() + assert trait.layers is not None + before_floor = trait.layers.class_counts.get("floor", 0) + + trait.calibration = GridCalibration(resolution=1.0, origin_x=0.0, origin_y=5.0, y_sign=1) + # Cover only the top two grid rows (pixel y 0..1 -> world y 4..5). + trait.erase_zones = [Q10EraseZone(vertices=[(0, 4), (7, 4), (7, 5), (0, 5)])] + trait._apply_erase(trait.calibration) + + after_floor = trait.layers.class_counts.get("floor", 0) + assert 0 < after_floor < before_floor # some, not all, floor removed + + +def test_load_overlays_partial_update_keeps_existing_zones() -> None: + """A status push without the zone DP (None) must not wipe loaded zones.""" + trait = MapContentTrait() + blob = ( + bytes([1, 1]) + + bytes([0, 4]) + + b"".join(int.to_bytes(v & 0xFFFF, 2, "big") for xy in [(0, 0), (4, 0), (4, 4), (0, 4)] for v in xy) + ) + trait.load_overlays(restricted_zone_up=blob) + assert len(trait.zones) == 1 + # A later partial update carrying only the (empty) virtual-wall DP. + trait.load_overlays(restricted_zone_up=None, virtual_wall_up=b"\x00") + assert len(trait.zones) == 1 # zones preserved + assert trait.virtual_walls == [] diff --git a/tests/map/test_b01_grid_layers.py b/tests/map/test_b01_grid_layers.py new file mode 100644 index 00000000..c6809f94 --- /dev/null +++ b/tests/map/test_b01_grid_layers.py @@ -0,0 +1,162 @@ +"""Tests for the device-agnostic grid->layers decomposition + Q10 classifier.""" + +import io +from pathlib import Path + +import pytest +from PIL import Image + +from roborock.map.b01_grid_layers import ( + LAYER_BACKGROUND, + LAYER_FLOOR, + LAYER_WALL, + GridCalibration, + decompose_grid, + solve_calibration, +) +from roborock.map.b01_q10_map_parser import ( + classify_q10_cell, + decompose_layers, + parse_map_packet, +) + +FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_map.bin" + + +@pytest.mark.parametrize( + ("value", "expected"), + [ + (0, "unknown"), + (8, LAYER_FLOOR), + (12, LAYER_FLOOR), + (240, LAYER_FLOOR), + (243, LAYER_BACKGROUND), + (249, LAYER_WALL), + ], +) +def test_classify_q10_cell(value: int, expected: str) -> None: + assert classify_q10_cell(value) == expected + + +def test_decompose_grid_generic_classifier_and_bbox() -> None: + """A hand-built grid decomposes into the right classes with room bboxes.""" + # 4x3 grid: row0 walls(9), row1 = floor room 5 (value 20) at x1..2, row2 background(7) + grid = bytes( + [ + 9, + 9, + 9, + 9, + 0, + 20, + 20, + 0, + 7, + 7, + 7, + 7, + ] + ) + + def classify(v: int) -> str: + if v == 9: + return LAYER_WALL + if v == 7: + return LAYER_BACKGROUND + if v == 0: + return "unknown" + return LAYER_FLOOR + + layers = decompose_grid(4, 3, grid, [(5, "Office", 20, 2)], classify) + assert layers.class_counts == {LAYER_WALL: 4, "unknown": 2, LAYER_FLOOR: 2, LAYER_BACKGROUND: 4} + assert len(layers.rooms) == 1 + room = layers.rooms[0] + assert room.id == 5 and room.name == "Office" and room.pixel_value == 20 + assert room.bbox == (1, 1, 2, 1) # the two floor cells, row 1, x in 1..2 + assert room.center == (1.5, 1.0) + + +def test_render_mask_produces_transparent_rgba() -> None: + grid = bytes([0, 20, 20, 0]) + + def classify(v: int) -> str: + return LAYER_FLOOR if v == 20 else "unknown" + + layers = decompose_grid(4, 1, grid, [(5, "Office", 20, 2)], classify) + png = layers.render_class(LAYER_FLOOR, (10, 20, 30, 255), flip=False) + img = Image.open(io.BytesIO(png)) + assert img.mode == "RGBA" and img.size == (4, 1) + pixels = list(img.getdata()) + assert pixels == [(0, 0, 0, 0), (10, 20, 30, 255), (10, 20, 30, 255), (0, 0, 0, 0)] + + +def test_render_scale_upsamples() -> None: + layers = decompose_grid(2, 1, bytes([20, 20]), [(5, "R", 20, 2)], lambda v: LAYER_FLOOR) + png = layers.render_class(LAYER_FLOOR, (1, 2, 3, 4), scale=3) + assert Image.open(io.BytesIO(png)).size == (6, 3) + + +def test_decompose_layers_on_q10_fixture() -> None: + """The Q10 synthetic fixture splits into floor + per-room layers.""" + layers = decompose_layers(parse_map_packet(FIXTURE.read_bytes())) + assert layers.class_counts.get(LAYER_FLOOR) == 26 + names = {room.id: room.name for room in layers.rooms} + assert names == {2: "Living Room", 3: "Bedroom"} + # Each room renders to a valid PNG and only its own pixels are opaque. + living = layers.render_room(2, (255, 0, 0, 255)) + img = Image.open(io.BytesIO(living)) + opaque = sum(1 for *_rgb, a in img.getdata() if a > 0) + assert opaque == next(r.pixel_count for r in layers.rooms if r.id == 2) + + +def test_render_room_unknown_id_raises() -> None: + layers = decompose_layers(parse_map_packet(FIXTURE.read_bytes())) + with pytest.raises(KeyError): + layers.render_room(999, (0, 0, 0, 255)) + + +def test_calibration_roundtrip() -> None: + cal = GridCalibration(resolution=2.0, origin_x=3.0, origin_y=8.0, y_sign=1) + assert cal.world_to_pixel(0, 0) == (3.0, 8.0) + assert cal.world_to_pixel(10, 10) == (8.0, 3.0) # +x right, +y up (flipped) + back = cal.pixel_to_world(*cal.world_to_pixel(6, -4)) + assert back == (6, -4) + + +def _floor_block_layers(): + """12x12 grid with a 6x6 floor block at x[3..8], y[3..8]; rest background.""" + grid = bytearray([9] * 144) # 9 = background + for y in range(3, 9): + for x in range(3, 9): + grid[y * 12 + x] = 1 # 1 = floor + classify = {1: LAYER_FLOOR, 9: LAYER_BACKGROUND} + return decompose_grid(12, 12, bytes(grid), [], lambda v: classify.get(v, "other")) + + +def test_solve_calibration_recovers_known_transform() -> None: + """A 2D-spread path over a floor block pins resolution + origin uniquely.""" + layers = _floor_block_layers() + true = GridCalibration(2.0, 3.0, 8.0, 1) + # World points chosen to land across the floor block (2D spread, not 1D). + pixels = [(3, 8), (8, 8), (3, 3), (8, 3), (5, 5), (6, 6), (4, 7), (7, 4)] + points = [true.pixel_to_world(px, py) for px, py in pixels] + cal = solve_calibration(layers, points, resolutions=[1.0, 1.5, 2.0, 2.5, 3.0]) + assert cal is not None + assert cal.resolution == 2.0 + assert (cal.origin_x, cal.origin_y, cal.y_sign) == (3.0, 8.0, 1) + + +def test_solve_calibration_considers_right_edge_fit() -> None: + """A valid placement is not skipped when the path bbox reaches the map edge.""" + layers = decompose_grid(4, 2, bytes([1] * 8), [], lambda v: LAYER_FLOOR) + points = [(0.0, 0.0), (3.2, 0.0), (6.4, 0.0)] + cal = solve_calibration(layers, points, resolutions=[2.0], y_signs=[1]) + assert cal is not None + assert (cal.origin_x, cal.origin_y, cal.y_sign) == (0.0, 0.0, 1) + + +def test_solve_calibration_returns_none_when_unfittable() -> None: + layers = _floor_block_layers() + # Points so far apart no resolution keeps them on the 6x6 floor block. + points = [(0.0, 0.0), (1000.0, 0.0), (0.0, 1000.0)] + assert solve_calibration(layers, points, resolutions=[2.0]) is None diff --git a/tests/map/test_b01_map_parser.py b/tests/map/test_b01_map_parser.py index 0829182e..2e66cd4f 100644 --- a/tests/map/test_b01_map_parser.py +++ b/tests/map/test_b01_map_parser.py @@ -11,7 +11,14 @@ from PIL import Image from roborock.exceptions import RoborockException -from roborock.map.b01_map_parser import B01MapParser, _parse_scmap_payload +from roborock.map.b01_grid_layers import LAYER_BACKGROUND, LAYER_FLOOR, LAYER_WALL +from roborock.map.b01_map_parser import ( + B01MapParser, + _parse_scmap_payload, + classify_q7_cell, + decompose_q7_layers, + q7_calibration, +) from roborock.map.proto.b01_scmap_pb2 import RobotMap # type: ignore[attr-defined] from roborock.protocols.b01_q7_protocol import create_map_key, decode_map_payload @@ -126,6 +133,31 @@ def test_b01_scmap_parser_maps_observed_schema_fields() -> None: assert not parsed.roomDataInfo[1].HasField("roomName") +def test_classify_q7_cell() -> None: + assert classify_q7_cell(0) == LAYER_BACKGROUND + assert classify_q7_cell(127) == LAYER_WALL + assert classify_q7_cell(128) == LAYER_FLOOR + + +def test_q7_layers_and_calibration_from_fixture() -> None: + """Q7 reuses the shared grid decomposition + reads calibration from mapHead.""" + inflated = gzip.decompress(FIXTURE.read_bytes()) + + layers = decompose_q7_layers(inflated) + assert set(layers.class_counts) == {LAYER_BACKGROUND, LAYER_WALL, LAYER_FLOOR} + assert layers.class_counts[LAYER_FLOOR] > 0 + assert layers.rooms == [] # Q7 raster has no per-room segmentation + + cal = q7_calibration(inflated) + assert cal is not None + # mapHead gives minX=-5, minY=-7, resolution=0.05 -> origin from those. + assert cal.resolution == pytest.approx(0.05, abs=1e-4) + assert cal.origin_x == pytest.approx(5.0 / cal.resolution, abs=1.0) + # World origin (0,0) maps inside the grid. + px, py = cal.world_to_pixel(0.0, 0.0) + assert 0 <= px < layers.width and 0 <= py < layers.height + + def test_b01_map_parser_rejects_invalid_payload() -> None: parser = B01MapParser() with pytest.raises(RoborockException, match="Failed to parse B01 SCMap"): diff --git a/tests/map/test_b01_q10_map_parser.py b/tests/map/test_b01_q10_map_parser.py new file mode 100644 index 00000000..96321b72 --- /dev/null +++ b/tests/map/test_b01_q10_map_parser.py @@ -0,0 +1,209 @@ +"""Tests for the Roborock Q10 (B01/ss07) map parser.""" + +from pathlib import Path + +import pytest + +from roborock.exceptions import RoborockException +from roborock.map.b01_q10_map_parser import ( + B01Q10MapParser, + Q10Room, + is_map_packet, + is_trace_packet, + lz4_block_decompress, + parse_map_packet, + parse_trace_packet, +) + +FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_map.bin" +TRACE_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace.bin" +TRACE_MULTI_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace_multi.bin" +# Real 15-point packet captured from an R1 corridor run (full session path). +TRACE_SESSION_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace_session.bin" + + +def _payload() -> bytes: + return FIXTURE.read_bytes() + + +def _literal_lz4_block(data: bytes) -> bytes: + block = bytearray() + literal_length = len(data) + if literal_length < 15: + block.append(literal_length << 4) + else: + block.append(0xF0) + remaining = literal_length - 15 + while remaining >= 0xFF: + block.append(0xFF) + remaining -= 0xFF + block.append(remaining) + block.extend(data) + return bytes(block) + + +def _synthetic_map_payload(width: int, decoded_layout: bytes) -> bytes: + compressed = _literal_lz4_block(decoded_layout) + payload = bytearray(29) + payload[0:2] = b"\x01\x01" + payload[2:6] = (0x01020304).to_bytes(4, "big") + payload[8:10] = width.to_bytes(2, "little") + payload[27:29] = len(compressed).to_bytes(2, "big") + payload.extend(compressed) + return bytes(payload) + + +def test_lz4_block_roundtrip_all_literals() -> None: + """A simple all-literals block decodes back to the original bytes.""" + original = bytes(range(60)) * 3 + block = bytearray() + block.append(0x0F << 4) + block.append(len(original) - 15) + block += original + assert lz4_block_decompress(bytes(block)) == original + + +def test_lz4_block_back_reference() -> None: + """Back-references expand runs (e.g. RLE-style repeats).""" + # seq1: 1 literal 'A', then match (offset 1, length 4+4=8) -> 'A' x9. + # seq2: final literals-only token (0 literals) ends the block per LZ4 spec. + block = bytes([0x14, ord("A"), 0x01, 0x00, 0x00]) + assert lz4_block_decompress(block) == b"A" * 9 + + +def test_is_map_packet() -> None: + assert is_map_packet(b"\x01\x01rest") + assert not is_map_packet(b"\x02\x01rest") # trace packet + assert not is_map_packet(b"") + + +def test_parse_map_packet() -> None: + packet = parse_map_packet(_payload()) + assert packet.width == 8 + assert packet.height == 6 + assert packet.map_id == 0x01020304 + assert len(packet.grid) == packet.width * packet.height + assert [(r.id, r.raw_name) for r in packet.rooms] == [(2, "rr_living_room"), (3, "bedroom")] + + +def test_parse_map_packet_allows_zero_room_metadata() -> None: + """A map can be present before the robot has room segmentation records.""" + grid = bytes([240, 240, 249, 243, 240, 240]) + packet = parse_map_packet(_synthetic_map_payload(width=3, decoded_layout=grid + b"\x01\x00")) + assert packet.width == 3 + assert packet.height == 2 + assert packet.grid == grid + assert packet.rooms == [] + + +def test_room_name_normalization() -> None: + """Firmware ``rr_`` default names are normalized; custom names are titled.""" + assert Q10Room(id=2, raw_name="rr_living_room", pixel_value=8, pixel_count=9).name == "Living Room" + assert Q10Room(id=3, raw_name="bedroom", pixel_value=12, pixel_count=9).name == "Bedroom" + + +def test_room_pixel_count_matches_grid() -> None: + packet = parse_map_packet(_payload()) + for room in packet.rooms: + assert room.pixel_value == (room.id * 4) & 0xFF + assert room.pixel_count == packet.grid.count(room.pixel_value) + + +def test_parser_renders_png_and_room_names() -> None: + parsed = B01Q10MapParser().parse(_payload()) + assert parsed.image_content is not None + assert parsed.image_content[:8] == b"\x89PNG\r\n\x1a\n" # PNG magic + assert parsed.map_data is not None + assert parsed.map_data.additional_parameters["room_names"] == {2: "Living Room", 3: "Bedroom"} + + +def test_parse_rejects_non_map_packet() -> None: + with pytest.raises(RoborockException, match="not a Q10 map packet"): + parse_map_packet(b"\x02\x01" + b"\x00" * 40) + + +def test_packet_markers_are_distinct() -> None: + map_payload = _payload() + trace_payload = TRACE_FIXTURE.read_bytes() + assert is_map_packet(map_payload) and not is_trace_packet(map_payload) + assert is_trace_packet(trace_payload) and not is_map_packet(trace_payload) + + +def test_parse_trace_packet_real_single_point() -> None: + """A real ss07 packet captured early in a session has a single path point.""" + trace = parse_trace_packet(TRACE_FIXTURE.read_bytes()) + assert trace.sequence == 9 + assert [(p.x, p.y) for p in trace.points] == [(169, 0)] + assert trace.robot_position is not None + assert (trace.robot_position.x, trace.robot_position.y) == (169, 0) + + +def test_parse_trace_packet_real_session_path() -> None: + """A real 15-point packet (corridor run) decodes the full accumulated path. + + Captured live from an R1: the same session emitted packets of 1, then 3, + then 15 points, proving the path accumulates rather than reporting only the + current position. The most recent point is the current robot position. + """ + trace = parse_trace_packet(TRACE_SESSION_FIXTURE.read_bytes()) + points = [(p.x, p.y) for p in trace.points] + assert len(points) == 15 + assert points[0] == (-34, 0) # oldest + assert points[-1] == (276, -1) # most recent == current position + # After the initial repositioning, x marches steadily down the corridor. + tail_x = [p[0] for p in points[2:]] + assert tail_x == sorted(tail_x) + assert points[-1][0] - points[0][0] > 300 # spans the corridor + assert trace.robot_position is not None + assert (trace.robot_position.x, trace.robot_position.y) == (276, -1) + + +def test_parse_trace_packet_multi_point() -> None: + """A multi-point packet decodes all points; position is the most recent.""" + trace = parse_trace_packet(TRACE_MULTI_FIXTURE.read_bytes()) + assert [(p.x, p.y) for p in trace.points] == [(100, 200), (150, 250), (-50, 300)] + # Signed coordinates are supported (negative x). + assert trace.robot_position is not None + assert (trace.robot_position.x, trace.robot_position.y) == (-50, 300) + + +def test_parse_trace_empty_path_has_no_position() -> None: + header_only = b"\x02\x01" + b"\x00" * 8 # 10-byte header, no points + trace = parse_trace_packet(header_only) + assert trace.points == [] + assert trace.robot_position is None + + +def test_parse_erase_zones_from_map_packet_tail() -> None: + """Erase rectangles appended after the grid decode to world polygons.""" + rects = [ + [(100, 200), (300, 200), (300, 50), (100, 50)], + [(-40, -10), (10, -10), (10, -60), (-40, -60)], + ] + tail = bytes([len(rects), 4]) + for rect in rects: + for x, y in rect: + tail += int.to_bytes(x & 0xFFFF, 2, "big") + int.to_bytes(y & 0xFFFF, 2, "big") + packet = parse_map_packet(FIXTURE.read_bytes() + tail) + assert [z.vertices for z in packet.erase_zones] == rects # incl. signed coords + + +def test_parse_map_packet_without_erase_tail() -> None: + assert parse_map_packet(FIXTURE.read_bytes()).erase_zones == [] + + +def test_parse_trace_rejects_non_trace_packet() -> None: + with pytest.raises(RoborockException, match="not a Q10 trace packet"): + parse_trace_packet(_payload()) + + +def test_parse_trace_rejects_misaligned_points() -> None: + with pytest.raises(RoborockException, match="not 4-byte"): + parse_trace_packet(b"\x02\x01" + b"\x00" * 8 + b"\x01\x02\x03") + + +def test_parse_rejects_bad_layout_length() -> None: + payload = bytearray(_payload()) + payload[27:29] = (0xFFFF).to_bytes(2, "big") # compressed length past the buffer + with pytest.raises(RoborockException, match="invalid layout block length"): + parse_map_packet(bytes(payload)) diff --git a/tests/map/test_b01_q10_overlays.py b/tests/map/test_b01_q10_overlays.py new file mode 100644 index 00000000..9a3ce3db --- /dev/null +++ b/tests/map/test_b01_q10_overlays.py @@ -0,0 +1,58 @@ +"""Tests for the Q10 vector-overlay (no-go / no-mop / virtual wall) decoder.""" + +import base64 + +from roborock.map.b01_q10_overlays import ( + ZONE_TYPE_NO_GO, + ZONE_TYPE_NO_MOP, + parse_zone_blob, +) + + +def _blob(version: int, records: list[bytes], record_size: int = 18) -> bytes: + body = b"".join(r.ljust(record_size, b"\x00") for r in records) + return bytes([version, len(records)]) + body + + +def _rect(zone_type: int, corners: list[tuple[int, int]]) -> bytes: + out = bytes([zone_type, len(corners)]) + for x, y in corners: + out += int.to_bytes(x & 0xFFFF, 2, "big") + int.to_bytes(y & 0xFFFF, 2, "big") + return out + + +def test_parse_zone_blob_two_typed_rectangles() -> None: + rect_a = _rect(ZONE_TYPE_NO_GO, [(0, 0), (10, 0), (10, 10), (0, 10)]) + rect_b = _rect(ZONE_TYPE_NO_MOP, [(-5, -5), (5, -5), (5, 5), (-5, 5)]) + zones = parse_zone_blob(_blob(1, [rect_a, rect_b])) + assert [z.type for z in zones] == [ZONE_TYPE_NO_GO, ZONE_TYPE_NO_MOP] + assert zones[0].vertices == [(0, 0), (10, 0), (10, 10), (0, 10)] + assert zones[1].vertices == [(-5, -5), (5, -5), (5, 5), (-5, 5)] # signed coords + + +def test_parse_zone_blob_accepts_base64() -> None: + blob = _blob(1, [_rect(ZONE_TYPE_NO_GO, [(1, 2), (3, 4), (5, 6), (7, 8)])]) + zones = parse_zone_blob(base64.b64encode(blob).decode()) + assert len(zones) == 1 and zones[0].vertices[2] == (5, 6) + + +def test_parse_zone_blob_empty_variants() -> None: + assert parse_zone_blob(None) == [] + assert parse_zone_blob(b"\x00") == [] # device's "no zones" sentinel + assert parse_zone_blob("AA==") == [] # base64 of 0x00 + assert parse_zone_blob(bytes([1, 0, 0])) == [] # version=1, count=0 + + +def test_parse_zone_blob_skips_malformed_record() -> None: + # vertex_count claims 9 verts (needs 38 bytes) but record is only 18 -> skipped. + bad = bytes([ZONE_TYPE_NO_GO, 9]) + b"\x00" * 16 + good = _rect(ZONE_TYPE_NO_GO, [(1, 1), (2, 2), (3, 3), (4, 4)]) + zones = parse_zone_blob(_blob(1, [bad, good])) + assert len(zones) == 1 and zones[0].vertices[0] == (1, 1) + + +def test_parse_zone_blob_real_record_size_inferred() -> None: + """Record size is inferred from total/count (real device uses 38).""" + rect = _rect(ZONE_TYPE_NO_GO, [(100, 200), (300, 200), (300, 50), (100, 50)]) + zones = parse_zone_blob(_blob(1, [rect], record_size=38)) + assert len(zones) == 1 and zones[0].vertices[0] == (100, 200) diff --git a/tests/map/testdata/b01_q10_map.bin b/tests/map/testdata/b01_q10_map.bin new file mode 100644 index 00000000..05ca7083 Binary files /dev/null and b/tests/map/testdata/b01_q10_map.bin differ diff --git a/tests/map/testdata/b01_q10_trace.bin b/tests/map/testdata/b01_q10_trace.bin new file mode 100644 index 00000000..ace261d0 Binary files /dev/null and b/tests/map/testdata/b01_q10_trace.bin differ diff --git a/tests/map/testdata/b01_q10_trace_multi.bin b/tests/map/testdata/b01_q10_trace_multi.bin new file mode 100644 index 00000000..8377e6c0 Binary files /dev/null and b/tests/map/testdata/b01_q10_trace_multi.bin differ diff --git a/tests/map/testdata/b01_q10_trace_session.bin b/tests/map/testdata/b01_q10_trace_session.bin new file mode 100644 index 00000000..a4222e81 Binary files /dev/null and b/tests/map/testdata/b01_q10_trace_session.bin differ