diff --git a/roborock/cli.py b/roborock/cli.py index b36b11ce..fa4ff2aa 100644 --- a/roborock/cli.py +++ b/roborock/cli.py @@ -50,6 +50,7 @@ from roborock.devices.device import RoborockDevice from roborock.devices.device_manager import DeviceManager, UserParams, create_device_manager from roborock.devices.traits import Trait +from roborock.devices.traits.b01.q10 import Q10PropertiesApi from roborock.devices.traits.b01.q10.vacuum import VacuumTrait from roborock.devices.traits.v1 import V1TraitMixin from roborock.devices.traits.v1.consumeable import ConsumableAttribute @@ -521,6 +522,45 @@ async def maps(ctx, device_id: str): await _display_v1_trait(context, device_id, lambda v1: v1.maps) +# The Q10 pushes its map ~9s after a dpRequestDps; firmware throttles pushes to +# ~once per 60-70s, so a single request is answered quickly but rapid re-requests +# may not be. This bounds how long a one-shot CLI command waits for that push. +_Q10_MAP_PUSH_TIMEOUT = 30.0 + + +async def _await_q10_map_push( + properties: Q10PropertiesApi, + predicate: Callable[[], bool], + *, + timeout: float = _Q10_MAP_PUSH_TIMEOUT, + allow_cached_on_timeout: bool = False, +) -> bool: + """Nudge a Q10 to push its map/trace and wait for a fresh update. + + The Q10 map API is entirely push-driven: there is no synchronous get-map + request. A ``dpRequestDps`` causes the device to publish a ``MAP_RESPONSE``, + which the device's subscribe loop feeds into the map trait. Here we register + an update listener, send the request, and wait for a newly pushed update to + satisfy ``predicate``. Returns whether it did within ``timeout``. + """ + loop = asyncio.get_running_loop() + updated: asyncio.Future[None] = loop.create_future() + + def on_update() -> None: + if predicate() and not updated.done(): + updated.set_result(None) + + unsub = properties.map.add_update_listener(on_update) + try: + await properties.refresh() + await asyncio.wait_for(updated, timeout=timeout) + return True + except TimeoutError: + return allow_cached_on_timeout and predicate() + finally: + unsub() + + @session.command() @click.option("--device_id", required=True) @click.option("--output-file", required=True, help="Path to save the map image.") @@ -529,10 +569,22 @@ async def maps(ctx, device_id: str): async def map_image(ctx, device_id: str, output_file: str): """Get device map image and save it to a file.""" context: RoborockContext = ctx.obj - trait: MapContentTrait = await _v1_trait(context, device_id, lambda v1: v1.map_content) - if trait.image_content: + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is not None: + properties = device.b01_q10_properties + await _await_q10_map_push( + properties, + lambda: properties.map.image_content is not None, + allow_cached_on_timeout=True, + ) + image_content = properties.map.image_content + else: + v1_trait: MapContentTrait = await _v1_trait(context, device_id, lambda v1: v1.map_content) + image_content = v1_trait.image_content + if image_content: with open(output_file, "wb") as f: - f.write(trait.image_content) + f.write(image_content) click.echo(f"Map image saved to {output_file}") else: click.echo("No map image content available.") @@ -564,6 +616,39 @@ async def map_data(ctx, device_id: str, include_path: bool): click.echo(dump_json(data_summary)) +@session.command() +@click.option("--device_id", required=True) +@click.option("--include_path", is_flag=True, default=False, help="Include all path points in the output.") +@click.pass_context +@async_command +async def q10_position(ctx, device_id: str, include_path: bool): + """Get the current Q10 robot position and live cleaning path. + + The Q10 only streams its position/path while it is actively cleaning, so this + will report that no live trace is available for an idle/docked robot. + """ + context: RoborockContext = ctx.obj + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is None: + click.echo("Feature not supported by device") + return + properties = device.b01_q10_properties + got_trace = await _await_q10_map_push(properties, lambda: bool(properties.map.path)) + if not got_trace: + click.echo("No live trace available (the robot only reports position while cleaning).") + return + map_trait = properties.map + position = map_trait.robot_position + summary: dict[str, Any] = { + "robot_position": {"x": position.x, "y": position.y} if position else None, + "path_points": len(map_trait.path), + } + if include_path: + summary["path"] = [[p.x, p.y] for p in map_trait.path] + click.echo(dump_json(summary)) + + @session.command() @click.option("--device_id", required=True) @click.pass_context @@ -705,7 +790,20 @@ async def set_child_lock(ctx, device_id: str, enabled: bool): async def rooms(ctx, device_id: str): """Get device room mapping info.""" context: RoborockContext = ctx.obj - await _display_v1_trait(context, device_id, lambda v1: v1.rooms) + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is not None: + properties = device.b01_q10_properties + # A valid map may have no room records, so wait on the map arriving + # (image_content) rather than on rooms being non-empty. + await _await_q10_map_push( + properties, + lambda: properties.map.image_content is not None, + allow_cached_on_timeout=True, + ) + click.echo(dump_json({room.id: room.name for room in properties.map.rooms})) + else: + await _display_v1_trait(context, device_id, lambda v1: v1.rooms) @session.command() @@ -1194,6 +1292,7 @@ def write_markdown_table(product_features: dict[str, dict[str, any]], all_featur cli.add_command(maps) cli.add_command(map_image) cli.add_command(map_data) +cli.add_command(q10_position) cli.add_command(consumables) cli.add_command(reset_consumable) cli.add_command(rooms) diff --git a/roborock/devices/rpc/b01_q10_channel.py b/roborock/devices/rpc/b01_q10_channel.py index 1e0510ba..50ec8e5a 100644 --- a/roborock/devices/rpc/b01_q10_channel.py +++ b/roborock/devices/rpc/b01_q10_channel.py @@ -19,7 +19,12 @@ async def stream_decoded_responses( mqtt_channel: MqttChannel, ) -> AsyncGenerator[dict[B01_Q10_DP, Any], None]: - """Stream decoded DPS messages received via MQTT.""" + """Stream decoded DPS messages received via MQTT. + + Messages that are not decodable DPS responses (e.g. protocol-301 + ``MAP_RESPONSE`` map pushes) are skipped; callers that need the raw + messages should subscribe to :meth:`MqttChannel.subscribe_stream` directly. + """ async for response_message in mqtt_channel.subscribe_stream(): try: diff --git a/roborock/devices/traits/b01/q10/__init__.py b/roborock/devices/traits/b01/q10/__init__.py index 184de2d2..791cdd0b 100644 --- a/roborock/devices/traits/b01/q10/__init__.py +++ b/roborock/devices/traits/b01/q10/__init__.py @@ -4,17 +4,21 @@ import logging from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP -from roborock.devices.rpc.b01_q10_channel import stream_decoded_responses from roborock.devices.traits import Trait from roborock.devices.transport.mqtt_channel import MqttChannel +from roborock.exceptions import RoborockException +from roborock.protocols.b01_q10_protocol import decode_rpc_response +from roborock.roborock_message import RoborockMessage from .command import CommandTrait +from .map import MapContentTrait from .remote import RemoteTrait from .status import StatusTrait from .vacuum import VacuumTrait __all__ = [ "Q10PropertiesApi", + "MapContentTrait", ] _LOGGER = logging.getLogger(__name__) @@ -35,6 +39,9 @@ class Q10PropertiesApi(Trait): remote: RemoteTrait """Trait for sending remote control related commands to Q10 devices.""" + map: MapContentTrait + """Trait for fetching the current parsed map (image + rooms).""" + def __init__(self, channel: MqttChannel) -> None: """Initialize the B01Props API.""" self._channel = channel @@ -42,6 +49,7 @@ def __init__(self, channel: MqttChannel) -> None: self.vacuum = VacuumTrait(self.command) self.remote = RemoteTrait(self.command) self.status = StatusTrait() + self.map = MapContentTrait() self._subscribe_task: asyncio.Task[None] | None = None async def start(self) -> None: @@ -65,14 +73,32 @@ async def refresh(self) -> None: await self.command.send(B01_Q10_DP.REQUEST_DPS, params={}) async def _subscribe_loop(self) -> None: - """Persistent loop to listen for status updates.""" - async for decoded_dps in stream_decoded_responses(self._channel): - _LOGGER.debug("Received Q10 status update: %s", decoded_dps) - - # Notify all traits about a new message and each trait will - # only update what fields that it is responsible for. - # More traits can be added here below. - self.status.update_from_dps(decoded_dps) + """Persistent loop dispatching pushed messages to the read-model traits.""" + async for message in self._channel.subscribe_stream(): + self._handle_message(message) + + def _handle_message(self, message: RoborockMessage) -> None: + """Route a single pushed message to the trait responsible for it. + + Map/trace pushes arrive as protocol-301 ``MAP_RESPONSE`` messages (not + DPS), so they are handled separately from the status DPS stream. The Q10 + is entirely push-driven: there is no synchronous get-map request, the + device just publishes its current map (a ``dpRequestDps`` nudges it to). + """ + if self.map.update_from_map_response(message): + return + + try: + decoded_dps = decode_rpc_response(message) + except RoborockException as ex: + _LOGGER.debug("Failed to decode Q10 RPC response: %s: %s", message, ex) + return + + _LOGGER.debug("Received Q10 status update: %s", decoded_dps) + # Notify all traits about a new message and each trait will + # only update what fields that it is responsible for. + # More traits can be added here below. + self.status.update_from_dps(decoded_dps) def create(channel: MqttChannel) -> Q10PropertiesApi: diff --git a/roborock/devices/traits/b01/q10/map.py b/roborock/devices/traits/b01/q10/map.py new file mode 100644 index 00000000..8cbb5d9e --- /dev/null +++ b/roborock/devices/traits/b01/q10/map.py @@ -0,0 +1,148 @@ +"""Map content trait for B01 Q10 devices. + +Unlike the v1 / Q7 maps, the Q10 has no synchronous "get map" command, so this +trait is purely push-driven and mirrors the Q10 ``StatusTrait`` contract: + +- The device pushes its current map/path as protocol-301 ``MAP_RESPONSE`` + messages (a ``dpRequestDps`` nudges it to do so). The ``Q10PropertiesApi`` + subscribe loop routes those messages to :meth:`MapContentTrait.update_from_map_response`. +- ``update_from_map_response`` parses the payload, updates the cached fields and + notifies update listeners (register via :meth:`add_update_listener`). +- ``parse_map_content()`` reparses the cached raw bytes without I/O. +- ``image_content``, ``map_data``, ``rooms``, ``path``, ``robot_position`` and + ``raw_api_response`` are readable and reflect the most recently pushed map. + +Unlike the Q7, the Q10 map payload is unencrypted, so no map key is required. +""" + +import logging +from dataclasses import dataclass, field + +from vacuum_map_parser_base.map_data import MapData + +from roborock.data import RoborockBase +from roborock.devices.traits.common import TraitUpdateListener +from roborock.exceptions import RoborockException +from roborock.map.b01_q10_map_parser import ( + B01Q10MapParser, + B01Q10MapParserConfig, + Q10Point, + Q10Room, + parse_map_packet, + parse_trace_packet, +) +from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol + +_LOGGER = logging.getLogger(__name__) + +_TRUNCATE_LENGTH = 20 + +# MAP_RESPONSE (protocol 301) payloads start with a 2-byte marker identifying the +# packet kind: a full map (``01 01``) or a live trace/path (``02 01``). +_MAP_PACKET_MARKER = b"\x01\x01" +_TRACE_PACKET_MARKER = b"\x02\x01" + + +@dataclass +class MapContent(RoborockBase): + """Dataclass representing Q10 map content.""" + + image_content: bytes | None = None + """The rendered image of the map in PNG format.""" + + map_data: MapData | None = None + """Parsed map data (image metadata + room names).""" + + rooms: list[Q10Room] = field(default_factory=list) + """Rooms (segments) reported by the device, with ids and names.""" + + path: list[Q10Point] = field(default_factory=list) + """Full path of the current cleaning session (oldest point first). + + The robot accumulates this server-side and serves the whole trajectory so + far in one packet, so it is complete even if we connect mid-session. Only + populated while a cleaning session is active.""" + + robot_position: Q10Point | None = None + """Current robot position (the most recent path point), if known.""" + + raw_api_response: bytes | None = None + """Raw bytes of the map payload from the device (opaque blob for re-parsing).""" + + def __repr__(self) -> str: + img = self.image_content + if img and len(img) > _TRUNCATE_LENGTH: + img = img[: _TRUNCATE_LENGTH - 3] + b"..." + return f"MapContent(image_content={img!r}, rooms={self.rooms!r})" + + +class MapContentTrait(MapContent, TraitUpdateListener): + """Trait holding the most recently pushed parsed map content for Q10 devices. + + The Q10 has no synchronous get-map request; the device pushes map and trace + packets, which the ``Q10PropertiesApi`` subscribe loop feeds into + :meth:`update_from_map_response`. Consumers read the cached fields and/or + register a callback with :meth:`add_update_listener` to be notified when new + map content arrives. + """ + + def __init__( + self, + *, + map_parser_config: B01Q10MapParserConfig | None = None, + ) -> None: + super().__init__() + TraitUpdateListener.__init__(self, logger=_LOGGER) + self._map_parser = B01Q10MapParser(map_parser_config) + + def update_from_map_response(self, message: RoborockMessage) -> bool: + """Update cached map/trace state from a pushed ``MAP_RESPONSE`` message. + + Returns ``True`` if the message was a recognized Q10 map (``01 01``) or + trace (``02 01``) packet (so the caller can stop processing it), and + ``False`` otherwise. Update listeners are notified only when a packet is + parsed successfully. + """ + if message.protocol != RoborockMessageProtocol.MAP_RESPONSE or not message.payload: + return False + marker = message.payload[:2] + if marker == _MAP_PACKET_MARKER: + self.raw_api_response = message.payload + try: + self.parse_map_content() + except RoborockException as ex: + _LOGGER.debug("Failed to parse Q10 map packet: %s", ex) + return True + self._notify_update() + return True + if marker == _TRACE_PACKET_MARKER: + try: + trace = parse_trace_packet(message.payload) + except RoborockException as ex: + _LOGGER.debug("Failed to parse Q10 trace packet: %s", ex) + return True + self.path = trace.points + self.robot_position = trace.robot_position + self._notify_update() + return True + return False + + def parse_map_content(self) -> None: + """Reparse the cached raw map payload without performing any I/O.""" + if self.raw_api_response is None: + raise RoborockException("No map payload available; no map has been pushed yet") + + try: + parsed = self._map_parser.parse(self.raw_api_response) + packet = parse_map_packet(self.raw_api_response) + except RoborockException: + raise + except Exception as ex: + raise RoborockException("Failed to parse Q10 map data") from ex + + if parsed.image_content is None: + raise RoborockException("Failed to render Q10 map image") + + self.image_content = parsed.image_content + self.map_data = parsed.map_data + self.rooms = packet.rooms diff --git a/roborock/map/b01_q10_map_parser.py b/roborock/map/b01_q10_map_parser.py new file mode 100644 index 00000000..a56e9024 --- /dev/null +++ b/roborock/map/b01_q10_map_parser.py @@ -0,0 +1,329 @@ +"""Parser for Roborock Q10 (B01/ss07) map packets. + +Q10 devices deliver map data as a protocol-301 ``MAP_RESPONSE`` message (pushed a +few seconds after a ``dpRequestDps`` request). Unlike the Q7 ``SCMap`` protobuf +format, the Q10 uses a custom, unencrypted binary packet: + +- ``01 01`` marker, then a ``u32be`` map id and a ``u16le`` grid width. +- A header field at offset 27 (``u16be``) giving the compressed layout length. +- An LZ4-block-compressed occupancy grid starting at offset 29. Once inflated it + is ``width * height`` cells of grid data followed by room metadata records. +- Room metadata begins with ``01 `` followed by fixed 47-byte + records (id, hints, ascii name). Each room paints cells with value + ``room_id * 4`` in the grid. + +The packet layout was confirmed against live Q10 captures. The format +documentation that informed this clean-room implementation comes from the +``roborock-qseries-map-bridge`` project (GPL-3.0-or-later): +https://github.com/v1b3c0d3x3r/roborock-qseries-map-bridge +""" + +import colorsys +import io +from dataclasses import dataclass, field + +from PIL import Image +from vacuum_map_parser_base.config.image_config import ImageConfig +from vacuum_map_parser_base.map_data import ImageData, MapData + +from roborock.exceptions import RoborockException + +from .map_parser import ParsedMapData + +_MAP_FILE_FORMAT = "PNG" + +MAP_PACKET_MARKER = b"\x01\x01" +TRACE_PACKET_MARKER = b"\x02\x01" + +_MAP_ID_OFFSET = 2 +_WIDTH_OFFSET = 8 +_COMPRESSED_LAYOUT_LENGTH_OFFSET = 27 +_LAYOUT_COMPRESSED_OFFSET = 29 +_ROOM_RECORD_LENGTH = 47 +_ROOM_NAME_LENGTH_OFFSET = 26 +_MAX_ROOMS = 32 + +# Grid cell values >= this are walls / borders rather than room segments. +_WALL_THRESHOLD = 240 + + +@dataclass +class Q10Room: + """A room (segment) described in a Q10 map packet.""" + + id: int + raw_name: str + pixel_value: int + pixel_count: int + + @property + def name(self) -> str: + """User friendly room name (firmware ``rr_`` defaults are normalized).""" + return self.raw_name.removeprefix("rr_").replace("_", " ").strip().title() + + +@dataclass +class Q10MapPacket: + """Decoded contents of a Q10 ``01 01`` map packet.""" + + map_id: int + width: int + height: int + grid: bytes + rooms: list[Q10Room] = field(default_factory=list) + + +@dataclass +class Q10Point: + """A single point in Q10 map/trace coordinate space.""" + + x: int + y: int + + +@dataclass +class Q10TracePacket: + """Decoded contents of a Q10 ``02 01`` cleaning-path packet. + + The robot accumulates the **full path of the current cleaning session** and + serves it in a single packet: ``points`` holds the whole trajectory so far + (oldest first), growing as the robot cleans. This was confirmed live -- a + corridor run produced packets of 1, then 3, then 15 points, each a strict + superset describing the path travelled. Because the robot keeps the path + server-side, a client that connects mid-session still receives the complete + path (this is how the app shows the trail even after a cold launch). + + The robot only emits these while a session is active, so an idle/docked robot + will not produce them. The most recent point is the current robot position. + """ + + points: list[Q10Point] = field(default_factory=list) + sequence: int = 0 + """Session counter (byte 3); increments per cleaning session, tracking the + device clean count. Not a per-packet sequence.""" + + @property + def robot_position(self) -> Q10Point | None: + """The current robot position (the most recent point).""" + return self.points[-1] if self.points else None + + +# Trace packet (``02 01``): a 10-byte header followed by big-endian int16 (x, y) +# point pairs forming the accumulated session path. Header layout confirmed +# against live ss07 captures: byte 3 is a session counter (tracks the device +# clean count); bytes 8-9 are a u16be point count minus one (verified: a 15-point +# packet carried 0x000e == 14). The parser reads all 4-byte pairs in the body +# rather than trusting the count field, so a truncated tail can't desync it. +# NOTE: the format documented by roborock-qseries-map-bridge (18-byte header) +# did not match this firmware -- this 10-byte layout is what the device sent. +_TRACE_HEADER_LENGTH = 10 +_TRACE_SEQUENCE_OFFSET = 3 + + +def is_map_packet(payload: bytes) -> bool: + """Return True if the payload is a Q10 full-map (``01 01``) packet.""" + return payload[:2] == MAP_PACKET_MARKER + + +def is_trace_packet(payload: bytes) -> bool: + """Return True if the payload is a Q10 live trace (``02 01``) packet.""" + return payload[:2] == TRACE_PACKET_MARKER + + +def parse_trace_packet(payload: bytes) -> Q10TracePacket: + """Parse a Q10 ``02 01`` trace packet into path points + robot position.""" + if not is_trace_packet(payload): + raise RoborockException("Payload is not a Q10 trace packet") + if len(payload) < _TRACE_HEADER_LENGTH: + raise RoborockException("Q10 trace packet is shorter than its header") + body = payload[_TRACE_HEADER_LENGTH:] + if len(body) % 4: + raise RoborockException("Q10 trace points are not 4-byte (x, y) pairs") + + points = [ + Q10Point( + x=int.from_bytes(body[offset : offset + 2], "big", signed=True), + y=int.from_bytes(body[offset + 2 : offset + 4], "big", signed=True), + ) + for offset in range(0, len(body), 4) + ] + return Q10TracePacket(points=points, sequence=payload[_TRACE_SEQUENCE_OFFSET]) + + +def lz4_block_decompress(data: bytes) -> bytes: + """Decompress a raw LZ4 *block* (no frame header). + + The Q10 map grid is stored as a single LZ4 block. This implements the + standard LZ4 block format so we don't add a native dependency. + """ + index = 0 + output = bytearray() + + def read_length(value: int) -> int: + nonlocal index + if value != 0x0F: + return value + while True: + if index >= len(data): + raise RoborockException("Truncated LZ4 block while reading length") + part = data[index] + index += 1 + value += part + if part != 0xFF: + return value + + while True: + if index >= len(data): + raise RoborockException("Truncated LZ4 block while reading token") + token = data[index] + index += 1 + + literal_length = read_length((token >> 4) & 0x0F) + end = index + literal_length + if end > len(data): + raise RoborockException("Truncated LZ4 block while reading literals") + output.extend(data[index:end]) + index = end + + if index == len(data): + return bytes(output) + if index + 2 > len(data): + raise RoborockException("Truncated LZ4 block while reading match offset") + + offset = data[index] | (data[index + 1] << 8) + index += 2 + if offset == 0 or offset > len(output): + raise RoborockException("Invalid LZ4 back-reference offset") + + match_length = read_length(token & 0x0F) + 4 + for _ in range(match_length): + output.append(output[-offset]) + + +def _infer_layout(decoded: bytes, width: int) -> tuple[int, bytes, bytes]: + """Split the inflated layout into (height, grid, room_data). + + The grid is ``width * height`` cells; the remaining bytes are room records + introduced by an ``01 `` marker. The room count is unknown up + front, so we search for the split that makes the grid rectangular and lines + up with the marker. + """ + for room_count in range(0, _MAX_ROOMS + 1): + room_data_length = 2 + room_count * _ROOM_RECORD_LENGTH + area = len(decoded) - room_data_length + if area <= 0 or area % width: + continue + room_data = decoded[area:] + if room_data[0] == 1 and room_data[1] == room_count: + return area // width, decoded[:area], room_data + raise RoborockException("Could not infer Q10 layout dimensions / room records") + + +def _parse_rooms(room_data: bytes, grid: bytes) -> list[Q10Room]: + rooms: list[Q10Room] = [] + room_count = room_data[1] + for index in range(room_count): + start = 2 + index * _ROOM_RECORD_LENGTH + record = room_data[start : start + _ROOM_RECORD_LENGTH] + room_id = int.from_bytes(record[0:2], "big") + name_length = record[_ROOM_NAME_LENGTH_OFFSET] + raw_name = record[27 : 27 + name_length].decode("utf-8", errors="replace") + pixel_value = (room_id * 4) & 0xFF + rooms.append( + Q10Room( + id=room_id, + raw_name=raw_name, + pixel_value=pixel_value, + pixel_count=grid.count(pixel_value), + ) + ) + return rooms + + +def parse_map_packet(payload: bytes) -> Q10MapPacket: + """Parse a Q10 ``01 01`` map packet into grid + room metadata.""" + if len(payload) < _LAYOUT_COMPRESSED_OFFSET or not is_map_packet(payload): + raise RoborockException("Payload is not a Q10 map packet") + + map_id = int.from_bytes(payload[_MAP_ID_OFFSET : _MAP_ID_OFFSET + 4], "big") + width = int.from_bytes(payload[_WIDTH_OFFSET : _WIDTH_OFFSET + 2], "little") + if width <= 0: + raise RoborockException("Q10 map packet has invalid width") + + compressed_length = int.from_bytes( + payload[_COMPRESSED_LAYOUT_LENGTH_OFFSET : _COMPRESSED_LAYOUT_LENGTH_OFFSET + 2], "big" + ) + layout_end = _LAYOUT_COMPRESSED_OFFSET + compressed_length + if compressed_length <= 0 or layout_end > len(payload): + raise RoborockException("Q10 map packet has invalid layout block length") + + decoded = lz4_block_decompress(payload[_LAYOUT_COMPRESSED_OFFSET:layout_end]) + height, grid, room_data = _infer_layout(decoded, width) + rooms = _parse_rooms(room_data, grid) + return Q10MapPacket(map_id=map_id, width=width, height=height, grid=grid, rooms=rooms) + + +@dataclass +class B01Q10MapParserConfig: + """Configuration for the Q10 map parser.""" + + map_scale: int = 4 + """Scale factor for the rendered map image.""" + + +class B01Q10MapParser: + """Decoder/renderer for Q10 ``MAP_RESPONSE`` (protocol 301) payloads.""" + + def __init__(self, config: B01Q10MapParserConfig | None = None) -> None: + self._config = config or B01Q10MapParserConfig() + + def parse(self, payload: bytes) -> ParsedMapData: + """Parse a raw Q10 map packet into a rendered PNG + ``MapData``.""" + packet = parse_map_packet(payload) + image = self._render(packet) + + map_data = MapData() + map_data.image = ImageData( + size=packet.width * packet.height, + top=0, + left=0, + height=packet.height, + width=packet.width, + image_config=ImageConfig(scale=self._config.map_scale), + data=image, + img_transformation=lambda p: p, + ) + room_names = {room.id: room.name for room in packet.rooms} + if room_names: + map_data.additional_parameters["room_names"] = room_names + + image_bytes = io.BytesIO() + image.save(image_bytes, format=_MAP_FILE_FORMAT) + return ParsedMapData(image_content=image_bytes.getvalue(), map_data=map_data) + + def _render(self, packet: Q10MapPacket) -> Image.Image: + """Render the Q10 grid: rooms get distinct colors, walls white, rest dark.""" + palette = _build_palette(packet.grid) + rgb = bytearray() + for value in packet.grid: + rgb.extend(palette[value]) + img = Image.frombytes("RGB", (packet.width, packet.height), bytes(rgb)) + img = img.transpose(Image.Transpose.FLIP_TOP_BOTTOM) + scale = self._config.map_scale + if scale > 1: + img = img.resize((packet.width * scale, packet.height * scale), resample=Image.Resampling.NEAREST) + return img + + +def _build_palette(grid: bytes) -> list[tuple[int, int, int]]: + """Map each grid value to an RGB color (rooms distinct, walls white).""" + palette: list[tuple[int, int, int]] = [(28, 30, 38)] * 256 # default: unknown/outside + room_values = sorted({v for v in set(grid) if 0 < v < _WALL_THRESHOLD}) + for index, value in enumerate(room_values): + hue = (index * 0.139) % 1.0 + r, g, b = colorsys.hsv_to_rgb(hue, 0.5, 0.95) + palette[value] = (int(r * 255), int(g * 255), int(b * 255)) + for value in range(_WALL_THRESHOLD, 256): + palette[value] = (235, 235, 240) # walls / borders + palette[0] = (28, 30, 38) + return palette diff --git a/tests/devices/traits/b01/q10/test_map.py b/tests/devices/traits/b01/q10/test_map.py new file mode 100644 index 00000000..8b3be279 --- /dev/null +++ b/tests/devices/traits/b01/q10/test_map.py @@ -0,0 +1,199 @@ +"""Tests for the Q10 B01 map content trait. + +The Q10 map API is push-driven: the device publishes ``MAP_RESPONSE`` messages +and the trait updates its cached state from them via ``update_from_map_response`` +(there is no synchronous get-map request). +""" + +import asyncio +from collections.abc import AsyncGenerator +from pathlib import Path +from unittest.mock import AsyncMock, Mock + +import pytest + +from roborock.cli import _await_q10_map_push, cli +from roborock.devices.traits.b01.q10 import Q10PropertiesApi, create +from roborock.devices.traits.b01.q10.map import MapContentTrait +from roborock.exceptions import RoborockException +from roborock.map.b01_q10_map_parser import Q10Point +from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol + +FIXTURE = Path("tests/map/testdata/b01_q10_map.bin") +TRACE_FIXTURE = Path("tests/map/testdata/b01_q10_trace.bin") + + +def _map_message( + payload: bytes, protocol: RoborockMessageProtocol = RoborockMessageProtocol.MAP_RESPONSE +) -> RoborockMessage: + return RoborockMessage(protocol=protocol, payload=payload, version=b"B01") + + +def test_update_from_map_response_populates_image_and_rooms() -> None: + """A pushed 01 01 map packet populates the image, rooms and map data.""" + payload = FIXTURE.read_bytes() + trait = MapContentTrait() + updates: list[None] = [] + trait.add_update_listener(lambda: updates.append(None)) + + assert trait.update_from_map_response(_map_message(payload)) is True + + assert trait.raw_api_response == payload + assert trait.image_content is not None + assert trait.image_content[:8] == b"\x89PNG\r\n\x1a\n" + assert {room.id: room.name for room in trait.rooms} == {2: "Living Room", 3: "Bedroom"} + assert trait.map_data is not None + assert len(updates) == 1 + + +def test_update_from_map_response_populates_path_and_position() -> None: + """A pushed 02 01 trace packet populates the path and robot position.""" + trait = MapContentTrait() + updates: list[None] = [] + trait.add_update_listener(lambda: updates.append(None)) + + assert trait.update_from_map_response(_map_message(TRACE_FIXTURE.read_bytes())) is True + + assert [(p.x, p.y) for p in trait.path] == [(169, 0)] + assert trait.robot_position is not None + assert (trait.robot_position.x, trait.robot_position.y) == (169, 0) + assert len(updates) == 1 + + +def test_update_from_map_response_ignores_non_map_messages() -> None: + """Non-MAP_RESPONSE messages are left for the status path to handle.""" + trait = MapContentTrait() + updates: list[None] = [] + trait.add_update_listener(lambda: updates.append(None)) + + rpc = _map_message(b"\x01\x01whatever", protocol=RoborockMessageProtocol.RPC_RESPONSE) + assert trait.update_from_map_response(rpc) is False + + # An unrecognized MAP_RESPONSE marker is also not consumed. + assert trait.update_from_map_response(_map_message(b"\x09\x09junk")) is False + + assert trait.image_content is None + assert not trait.path + assert not updates + + +def test_parse_without_data_raises() -> None: + trait = MapContentTrait() + with pytest.raises(RoborockException, match="No map payload available"): + trait.parse_map_content() + + +def test_q10_position_is_available_as_top_level_cli_command() -> None: + assert "q10-position" in cli.commands + + +# --- CLI push waiting -------------------------------------------------------- + + +class _FakeQ10Properties: + def __init__(self) -> None: + self.map = MapContentTrait() + self.refresh_count = 0 + + async def refresh(self) -> None: + self.refresh_count += 1 + + +class _FakeQ10PropertiesWithTrace(_FakeQ10Properties): + async def refresh(self) -> None: + await super().refresh() + self.map.update_from_map_response(_map_message(TRACE_FIXTURE.read_bytes())) + + +async def test_await_q10_map_push_waits_for_fresh_update() -> None: + """A cached trace alone is not treated as a successful new map push.""" + properties = _FakeQ10Properties() + properties.map.path = [Q10Point(1, 2)] + + got_trace = await _await_q10_map_push(properties, lambda: bool(properties.map.path), timeout=0.01) + + assert got_trace is False + assert properties.refresh_count == 1 + + +async def test_await_q10_map_push_returns_true_after_update() -> None: + properties = _FakeQ10PropertiesWithTrace() + + got_trace = await _await_q10_map_push(properties, lambda: bool(properties.map.path), timeout=0.01) + + assert got_trace is True + assert [(p.x, p.y) for p in properties.map.path] == [(169, 0)] + + +async def test_await_q10_map_push_can_fall_back_to_cached_map_on_timeout() -> None: + properties = _FakeQ10Properties() + properties.map.image_content = b"cached-png" + + got_map = await _await_q10_map_push( + properties, + lambda: properties.map.image_content is not None, + timeout=0.01, + allow_cached_on_timeout=True, + ) + + assert got_map is True + assert properties.refresh_count == 1 + + +# --- Integration through the Q10PropertiesApi subscribe loop ----------------- + + +@pytest.fixture +def message_queue() -> asyncio.Queue[RoborockMessage]: + return asyncio.Queue() + + +@pytest.fixture +def mock_channel(message_queue: asyncio.Queue[RoborockMessage]) -> AsyncMock: + async def mock_stream() -> AsyncGenerator[RoborockMessage, None]: + while True: + yield await message_queue.get() + + channel = AsyncMock() + channel.subscribe_stream = Mock(return_value=mock_stream()) + return channel + + +@pytest.fixture +async def q10_api(mock_channel: AsyncMock) -> AsyncGenerator[Q10PropertiesApi, None]: + api = create(mock_channel) + await api.start() + yield api + await api.close() + + +async def _wait_for(predicate, timeout: float = 2.0) -> None: + async with asyncio.timeout(timeout): + while not predicate(): + await asyncio.sleep(0.01) + + +async def test_subscribe_loop_routes_map_push( + q10_api: Q10PropertiesApi, + message_queue: asyncio.Queue[RoborockMessage], +) -> None: + """A map pushed onto the stream is routed to the map trait by the loop.""" + assert q10_api.map.image_content is None + + message_queue.put_nowait(_map_message(FIXTURE.read_bytes())) + + await _wait_for(lambda: q10_api.map.image_content is not None) + assert {room.id: room.name for room in q10_api.map.rooms} == {2: "Living Room", 3: "Bedroom"} + + +async def test_subscribe_loop_routes_trace_push( + q10_api: Q10PropertiesApi, + message_queue: asyncio.Queue[RoborockMessage], +) -> None: + """A trace pushed onto the stream is routed to the map trait by the loop.""" + assert not q10_api.map.path + + message_queue.put_nowait(_map_message(TRACE_FIXTURE.read_bytes())) + + await _wait_for(lambda: bool(q10_api.map.path)) + assert q10_api.map.robot_position is not None diff --git a/tests/map/test_b01_q10_map_parser.py b/tests/map/test_b01_q10_map_parser.py new file mode 100644 index 00000000..4dbcc9bf --- /dev/null +++ b/tests/map/test_b01_q10_map_parser.py @@ -0,0 +1,191 @@ +"""Tests for the Roborock Q10 (B01/ss07) map parser.""" + +from pathlib import Path + +import pytest + +from roborock.exceptions import RoborockException +from roborock.map.b01_q10_map_parser import ( + B01Q10MapParser, + Q10Room, + is_map_packet, + is_trace_packet, + lz4_block_decompress, + parse_map_packet, + parse_trace_packet, +) + +FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_map.bin" +TRACE_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace.bin" +TRACE_MULTI_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace_multi.bin" +# Real 15-point packet captured from an R1 corridor run (full session path). +TRACE_SESSION_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace_session.bin" + + +def _payload() -> bytes: + return FIXTURE.read_bytes() + + +def _literal_lz4_block(data: bytes) -> bytes: + block = bytearray() + literal_length = len(data) + if literal_length < 15: + block.append(literal_length << 4) + else: + block.append(0xF0) + remaining = literal_length - 15 + while remaining >= 0xFF: + block.append(0xFF) + remaining -= 0xFF + block.append(remaining) + block.extend(data) + return bytes(block) + + +def _synthetic_map_payload(width: int, decoded_layout: bytes) -> bytes: + compressed = _literal_lz4_block(decoded_layout) + payload = bytearray(29) + payload[0:2] = b"\x01\x01" + payload[2:6] = (0x01020304).to_bytes(4, "big") + payload[8:10] = width.to_bytes(2, "little") + payload[27:29] = len(compressed).to_bytes(2, "big") + payload.extend(compressed) + return bytes(payload) + + +def test_lz4_block_roundtrip_all_literals() -> None: + """A simple all-literals block decodes back to the original bytes.""" + original = bytes(range(60)) * 3 + block = bytearray() + block.append(0x0F << 4) + block.append(len(original) - 15) + block += original + assert lz4_block_decompress(bytes(block)) == original + + +def test_lz4_block_back_reference() -> None: + """Back-references expand runs (e.g. RLE-style repeats).""" + # seq1: 1 literal 'A', then match (offset 1, length 4+4=8) -> 'A' x9. + # seq2: final literals-only token (0 literals) ends the block per LZ4 spec. + block = bytes([0x14, ord("A"), 0x01, 0x00, 0x00]) + assert lz4_block_decompress(block) == b"A" * 9 + + +def test_is_map_packet() -> None: + assert is_map_packet(b"\x01\x01rest") + assert not is_map_packet(b"\x02\x01rest") # trace packet + assert not is_map_packet(b"") + + +def test_parse_map_packet() -> None: + packet = parse_map_packet(_payload()) + assert packet.width == 8 + assert packet.height == 6 + assert packet.map_id == 0x01020304 + assert len(packet.grid) == packet.width * packet.height + assert [(r.id, r.raw_name) for r in packet.rooms] == [(2, "rr_living_room"), (3, "bedroom")] + + +def test_parse_map_packet_allows_zero_room_metadata() -> None: + """A map can be present before the robot has room segmentation records.""" + grid = bytes([240, 240, 249, 243, 240, 240]) + packet = parse_map_packet(_synthetic_map_payload(width=3, decoded_layout=grid + b"\x01\x00")) + assert packet.width == 3 + assert packet.height == 2 + assert packet.grid == grid + assert packet.rooms == [] + + +def test_room_name_normalization() -> None: + """Firmware ``rr_`` default names are normalized; custom names are titled.""" + assert Q10Room(id=2, raw_name="rr_living_room", pixel_value=8, pixel_count=9).name == "Living Room" + assert Q10Room(id=3, raw_name="bedroom", pixel_value=12, pixel_count=9).name == "Bedroom" + + +def test_room_pixel_count_matches_grid() -> None: + packet = parse_map_packet(_payload()) + for room in packet.rooms: + assert room.pixel_value == (room.id * 4) & 0xFF + assert room.pixel_count == packet.grid.count(room.pixel_value) + + +def test_parser_renders_png_and_room_names() -> None: + parsed = B01Q10MapParser().parse(_payload()) + assert parsed.image_content is not None + assert parsed.image_content[:8] == b"\x89PNG\r\n\x1a\n" # PNG magic + assert parsed.map_data is not None + assert parsed.map_data.additional_parameters["room_names"] == {2: "Living Room", 3: "Bedroom"} + + +def test_parse_rejects_non_map_packet() -> None: + with pytest.raises(RoborockException, match="not a Q10 map packet"): + parse_map_packet(b"\x02\x01" + b"\x00" * 40) + + +def test_packet_markers_are_distinct() -> None: + map_payload = _payload() + trace_payload = TRACE_FIXTURE.read_bytes() + assert is_map_packet(map_payload) and not is_trace_packet(map_payload) + assert is_trace_packet(trace_payload) and not is_map_packet(trace_payload) + + +def test_parse_trace_packet_real_single_point() -> None: + """A real ss07 packet captured early in a session has a single path point.""" + trace = parse_trace_packet(TRACE_FIXTURE.read_bytes()) + assert trace.sequence == 9 + assert [(p.x, p.y) for p in trace.points] == [(169, 0)] + assert trace.robot_position is not None + assert (trace.robot_position.x, trace.robot_position.y) == (169, 0) + + +def test_parse_trace_packet_real_session_path() -> None: + """A real 15-point packet (corridor run) decodes the full accumulated path. + + Captured live from an R1: the same session emitted packets of 1, then 3, + then 15 points, proving the path accumulates rather than reporting only the + current position. The most recent point is the current robot position. + """ + trace = parse_trace_packet(TRACE_SESSION_FIXTURE.read_bytes()) + points = [(p.x, p.y) for p in trace.points] + assert len(points) == 15 + assert points[0] == (-34, 0) # oldest + assert points[-1] == (276, -1) # most recent == current position + # After the initial repositioning, x marches steadily down the corridor. + tail_x = [p[0] for p in points[2:]] + assert tail_x == sorted(tail_x) + assert points[-1][0] - points[0][0] > 300 # spans the corridor + assert trace.robot_position is not None + assert (trace.robot_position.x, trace.robot_position.y) == (276, -1) + + +def test_parse_trace_packet_multi_point() -> None: + """A multi-point packet decodes all points; position is the most recent.""" + trace = parse_trace_packet(TRACE_MULTI_FIXTURE.read_bytes()) + assert [(p.x, p.y) for p in trace.points] == [(100, 200), (150, 250), (-50, 300)] + # Signed coordinates are supported (negative x). + assert trace.robot_position is not None + assert (trace.robot_position.x, trace.robot_position.y) == (-50, 300) + + +def test_parse_trace_empty_path_has_no_position() -> None: + header_only = b"\x02\x01" + b"\x00" * 8 # 10-byte header, no points + trace = parse_trace_packet(header_only) + assert trace.points == [] + assert trace.robot_position is None + + +def test_parse_trace_rejects_non_trace_packet() -> None: + with pytest.raises(RoborockException, match="not a Q10 trace packet"): + parse_trace_packet(_payload()) + + +def test_parse_trace_rejects_misaligned_points() -> None: + with pytest.raises(RoborockException, match="not 4-byte"): + parse_trace_packet(b"\x02\x01" + b"\x00" * 8 + b"\x01\x02\x03") + + +def test_parse_rejects_bad_layout_length() -> None: + payload = bytearray(_payload()) + payload[27:29] = (0xFFFF).to_bytes(2, "big") # compressed length past the buffer + with pytest.raises(RoborockException, match="invalid layout block length"): + parse_map_packet(bytes(payload)) diff --git a/tests/map/testdata/b01_q10_map.bin b/tests/map/testdata/b01_q10_map.bin new file mode 100644 index 00000000..05ca7083 Binary files /dev/null and b/tests/map/testdata/b01_q10_map.bin differ diff --git a/tests/map/testdata/b01_q10_trace.bin b/tests/map/testdata/b01_q10_trace.bin new file mode 100644 index 00000000..ace261d0 Binary files /dev/null and b/tests/map/testdata/b01_q10_trace.bin differ diff --git a/tests/map/testdata/b01_q10_trace_multi.bin b/tests/map/testdata/b01_q10_trace_multi.bin new file mode 100644 index 00000000..8377e6c0 Binary files /dev/null and b/tests/map/testdata/b01_q10_trace_multi.bin differ diff --git a/tests/map/testdata/b01_q10_trace_session.bin b/tests/map/testdata/b01_q10_trace_session.bin new file mode 100644 index 00000000..a4222e81 Binary files /dev/null and b/tests/map/testdata/b01_q10_trace_session.bin differ