From bd4b3f98583ffcb4e36a4ab71b94551f30a24246 Mon Sep 17 00:00:00 2001 From: Vincent <2070309+tubededentifrice@users.noreply.github.com> Date: Sun, 14 Jun 2026 11:26:41 +0400 Subject: [PATCH 1/6] feat: add Q10 (B01/ss07) map support with rooms and rendered image Brings Q10 maps toward parity with V1 devices. Verified end-to-end against two physical Q10 (roborock.vacuum.ss07) robots. Protocol (reverse-engineered from live captures): - Requesting device state (dpRequestDps) makes the robot push its current map as a protocol-301 MAP_RESPONSE a few seconds later (firmware throttles to ~once per minute). - The "01 01" map packet carries a u32be map id, u16le grid width, and an LZ4-block-compressed occupancy grid followed by 47-byte room records (id + ascii name); room cells use value room_id*4. The payload is unencrypted, unlike the Q7 SCMap protobuf format. Changes: - roborock/map/b01_q10_map_parser.py: clean LZ4 block decoder + packet parser + renderer producing a PNG and MapData with room names. - roborock/devices/rpc/b01_q10_channel.py: request_map() triggers and awaits the MAP_RESPONSE push. - roborock/devices/traits/b01/q10/map.py: MapContentTrait (refresh/parse/image/ rooms), wired into Q10PropertiesApi. - cli: `map-image` and `rooms` now work for Q10 devices. - Tests + a synthetic (no-PII) map fixture. Map packet format documentation credit: the roborock-qseries-map-bridge project (GPL-3.0): https://github.com/v1b3c0d3x3r/roborock-qseries-map-bridge --- roborock/cli.py | 23 +- roborock/devices/rpc/b01_q10_channel.py | 36 +++ roborock/devices/traits/b01/q10/__init__.py | 6 + roborock/devices/traits/b01/q10/map.py | 87 +++++++ roborock/map/b01_q10_map_parser.py | 257 ++++++++++++++++++++ tests/devices/traits/b01/q10/test_map.py | 67 +++++ tests/map/test_b01_q10_map_parser.py | 86 +++++++ tests/map/testdata/b01_q10_map.bin | Bin 0 -> 175 bytes 8 files changed, 558 insertions(+), 4 deletions(-) create mode 100644 roborock/devices/traits/b01/q10/map.py create mode 100644 roborock/map/b01_q10_map_parser.py create mode 100644 tests/devices/traits/b01/q10/test_map.py create mode 100644 tests/map/test_b01_q10_map_parser.py create mode 100644 tests/map/testdata/b01_q10_map.bin diff --git a/roborock/cli.py b/roborock/cli.py index b36b11ce..1ca8ab11 100644 --- a/roborock/cli.py +++ b/roborock/cli.py @@ -529,10 +529,18 @@ async def maps(ctx, device_id: str): async def map_image(ctx, device_id: str, output_file: str): """Get device map image and save it to a file.""" context: RoborockContext = ctx.obj - trait: MapContentTrait = await _v1_trait(context, device_id, lambda v1: v1.map_content) - if trait.image_content: + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is not None: + map_trait = device.b01_q10_properties.map + await map_trait.refresh() + image_content = map_trait.image_content + else: + v1_trait: MapContentTrait = await _v1_trait(context, device_id, lambda v1: v1.map_content) + image_content = v1_trait.image_content + if image_content: with open(output_file, "wb") as f: - f.write(trait.image_content) + f.write(image_content) click.echo(f"Map image saved to {output_file}") else: click.echo("No map image content available.") @@ -705,7 +713,14 @@ async def set_child_lock(ctx, device_id: str, enabled: bool): async def rooms(ctx, device_id: str): """Get device room mapping info.""" context: RoborockContext = ctx.obj - await _display_v1_trait(context, device_id, lambda v1: v1.rooms) + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is not None: + map_trait = device.b01_q10_properties.map + await map_trait.refresh() + click.echo(dump_json({room.id: room.name for room in map_trait.rooms})) + else: + await _display_v1_trait(context, device_id, lambda v1: v1.rooms) @session.command() diff --git a/roborock/devices/rpc/b01_q10_channel.py b/roborock/devices/rpc/b01_q10_channel.py index 1e0510ba..79cb5dde 100644 --- a/roborock/devices/rpc/b01_q10_channel.py +++ b/roborock/devices/rpc/b01_q10_channel.py @@ -1,5 +1,6 @@ """Thin wrapper around the MQTT channel for Roborock B01 Q10 devices.""" +import asyncio import logging from collections.abc import AsyncGenerator from typing import Any @@ -12,9 +13,15 @@ decode_rpc_response, encode_mqtt_payload, ) +from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol _LOGGER = logging.getLogger(__name__) +# Requesting the device state (dpRequestDps) also makes the robot push its current +# map as a separate MAP_RESPONSE message a few seconds later. Q10 firmware throttles +# these pushes (~60-70s between maps), so callers should not poll tightly. +_MAP_TIMEOUT = 20.0 + async def stream_decoded_responses( mqtt_channel: MqttChannel, @@ -34,6 +41,35 @@ async def stream_decoded_responses( yield decoded_dps +async def request_map(mqtt_channel: MqttChannel, *, timeout: float | None = None) -> bytes: + """Request the current map and return the raw ``MAP_RESPONSE`` payload. + + The Q10 does not have a dedicated "get map" command. Instead, requesting the + device state (``dpRequestDps``) triggers the robot to push its current map as + a ``MAP_RESPONSE`` (protocol 301) message shortly afterwards. This subscribes + for that push, sends the trigger, and resolves on the first map message. + """ + if timeout is None: + timeout = _MAP_TIMEOUT + loop = asyncio.get_running_loop() + future: asyncio.Future[bytes] = loop.create_future() + + def on_message(message: RoborockMessage) -> None: + if future.done(): + return + if message.protocol == RoborockMessageProtocol.MAP_RESPONSE and message.payload: + future.set_result(message.payload) + + unsub = await mqtt_channel.subscribe(on_message) + try: + await send_command(mqtt_channel, B01_Q10_DP.REQUEST_DPS, {}) + return await asyncio.wait_for(future, timeout=timeout) + except TimeoutError as ex: + raise RoborockException(f"Timed out waiting for Q10 map after {timeout}s") from ex + finally: + unsub() + + async def send_command( mqtt_channel: MqttChannel, command: B01_Q10_DP, diff --git a/roborock/devices/traits/b01/q10/__init__.py b/roborock/devices/traits/b01/q10/__init__.py index 184de2d2..27b2af04 100644 --- a/roborock/devices/traits/b01/q10/__init__.py +++ b/roborock/devices/traits/b01/q10/__init__.py @@ -9,12 +9,14 @@ from roborock.devices.transport.mqtt_channel import MqttChannel from .command import CommandTrait +from .map import MapContentTrait from .remote import RemoteTrait from .status import StatusTrait from .vacuum import VacuumTrait __all__ = [ "Q10PropertiesApi", + "MapContentTrait", ] _LOGGER = logging.getLogger(__name__) @@ -35,6 +37,9 @@ class Q10PropertiesApi(Trait): remote: RemoteTrait """Trait for sending remote control related commands to Q10 devices.""" + map: MapContentTrait + """Trait for fetching the current parsed map (image + rooms).""" + def __init__(self, channel: MqttChannel) -> None: """Initialize the B01Props API.""" self._channel = channel @@ -42,6 +47,7 @@ def __init__(self, channel: MqttChannel) -> None: self.vacuum = VacuumTrait(self.command) self.remote = RemoteTrait(self.command) self.status = StatusTrait() + self.map = MapContentTrait(channel) self._subscribe_task: asyncio.Task[None] | None = None async def start(self) -> None: diff --git a/roborock/devices/traits/b01/q10/map.py b/roborock/devices/traits/b01/q10/map.py new file mode 100644 index 00000000..fdd8fcca --- /dev/null +++ b/roborock/devices/traits/b01/q10/map.py @@ -0,0 +1,87 @@ +"""Map content trait for B01 Q10 devices. + +This mirrors the v1 / Q7 ``MapContentTrait`` contract: +- ``refresh()`` performs I/O and populates cached fields. +- ``parse_map_content()`` reparses cached raw bytes without I/O. +- ``image_content``, ``map_data``, ``rooms`` and ``raw_api_response`` are readable. + +Unlike the Q7, the Q10 map payload is unencrypted, so no map key is required. +The raw payload is retrieved by :func:`request_map`, which triggers the device +to push its current map. +""" + +from dataclasses import dataclass, field + +from vacuum_map_parser_base.map_data import MapData + +from roborock.data import RoborockBase +from roborock.devices.rpc.b01_q10_channel import request_map +from roborock.devices.traits import Trait +from roborock.devices.transport.mqtt_channel import MqttChannel +from roborock.exceptions import RoborockException +from roborock.map.b01_q10_map_parser import B01Q10MapParser, B01Q10MapParserConfig, Q10Room, parse_map_packet + +_TRUNCATE_LENGTH = 20 + + +@dataclass +class MapContent(RoborockBase): + """Dataclass representing Q10 map content.""" + + image_content: bytes | None = None + """The rendered image of the map in PNG format.""" + + map_data: MapData | None = None + """Parsed map data (image metadata + room names).""" + + rooms: list[Q10Room] = field(default_factory=list) + """Rooms (segments) reported by the device, with ids and names.""" + + raw_api_response: bytes | None = None + """Raw bytes of the map payload from the device (opaque blob for re-parsing).""" + + def __repr__(self) -> str: + img = self.image_content + if img and len(img) > _TRUNCATE_LENGTH: + img = img[: _TRUNCATE_LENGTH - 3] + b"..." + return f"MapContent(image_content={img!r}, rooms={self.rooms!r})" + + +class MapContentTrait(MapContent, Trait): + """Trait for fetching parsed map content for Q10 devices.""" + + def __init__( + self, + channel: MqttChannel, + *, + map_parser_config: B01Q10MapParserConfig | None = None, + ) -> None: + super().__init__() + self._channel = channel + self._map_parser = B01Q10MapParser(map_parser_config) + + async def refresh(self) -> None: + """Fetch, decode, and parse the current map payload.""" + raw_payload = await request_map(self._channel) + self.raw_api_response = raw_payload + self.parse_map_content() + + def parse_map_content(self) -> None: + """Reparse the cached raw map payload without performing any I/O.""" + if self.raw_api_response is None: + raise RoborockException("No map payload available; call refresh() first") + + try: + parsed = self._map_parser.parse(self.raw_api_response) + packet = parse_map_packet(self.raw_api_response) + except RoborockException: + raise + except Exception as ex: + raise RoborockException("Failed to parse Q10 map data") from ex + + if parsed.image_content is None: + raise RoborockException("Failed to render Q10 map image") + + self.image_content = parsed.image_content + self.map_data = parsed.map_data + self.rooms = packet.rooms diff --git a/roborock/map/b01_q10_map_parser.py b/roborock/map/b01_q10_map_parser.py new file mode 100644 index 00000000..bb747959 --- /dev/null +++ b/roborock/map/b01_q10_map_parser.py @@ -0,0 +1,257 @@ +"""Parser for Roborock Q10 (B01/ss07) map packets. + +Q10 devices deliver map data as a protocol-301 ``MAP_RESPONSE`` message (pushed a +few seconds after a ``dpRequestDps`` request). Unlike the Q7 ``SCMap`` protobuf +format, the Q10 uses a custom, unencrypted binary packet: + +- ``01 01`` marker, then a ``u32be`` map id and a ``u16le`` grid width. +- A header field at offset 27 (``u16be``) giving the compressed layout length. +- An LZ4-block-compressed occupancy grid starting at offset 29. Once inflated it + is ``width * height`` cells of grid data followed by room metadata records. +- Room metadata begins with ``01 `` followed by fixed 47-byte + records (id, hints, ascii name). Each room paints cells with value + ``room_id * 4`` in the grid. + +The packet layout was confirmed against live Q10 captures. The format +documentation that informed this clean-room implementation comes from the +``roborock-qseries-map-bridge`` project (GPL-3.0-or-later): +https://github.com/v1b3c0d3x3r/roborock-qseries-map-bridge +""" + +import colorsys +import io +from dataclasses import dataclass, field + +from PIL import Image +from vacuum_map_parser_base.config.image_config import ImageConfig +from vacuum_map_parser_base.map_data import ImageData, MapData + +from roborock.exceptions import RoborockException + +from .map_parser import ParsedMapData + +_MAP_FILE_FORMAT = "PNG" + +MAP_PACKET_MARKER = b"\x01\x01" +TRACE_PACKET_MARKER = b"\x02\x01" + +_MAP_ID_OFFSET = 2 +_WIDTH_OFFSET = 8 +_COMPRESSED_LAYOUT_LENGTH_OFFSET = 27 +_LAYOUT_COMPRESSED_OFFSET = 29 +_ROOM_RECORD_LENGTH = 47 +_ROOM_NAME_LENGTH_OFFSET = 26 +_MAX_ROOMS = 32 + +# Grid cell values >= this are walls / borders rather than room segments. +_WALL_THRESHOLD = 240 + + +@dataclass +class Q10Room: + """A room (segment) described in a Q10 map packet.""" + + id: int + raw_name: str + pixel_value: int + pixel_count: int + + @property + def name(self) -> str: + """User friendly room name (firmware ``rr_`` defaults are normalized).""" + return self.raw_name.removeprefix("rr_").replace("_", " ").strip().title() + + +@dataclass +class Q10MapPacket: + """Decoded contents of a Q10 ``01 01`` map packet.""" + + map_id: int + width: int + height: int + grid: bytes + rooms: list[Q10Room] = field(default_factory=list) + + +def is_map_packet(payload: bytes) -> bool: + """Return True if the payload is a Q10 full-map (``01 01``) packet.""" + return payload[:2] == MAP_PACKET_MARKER + + +def lz4_block_decompress(data: bytes) -> bytes: + """Decompress a raw LZ4 *block* (no frame header). + + The Q10 map grid is stored as a single LZ4 block. This implements the + standard LZ4 block format so we don't add a native dependency. + """ + index = 0 + output = bytearray() + + def read_length(value: int) -> int: + nonlocal index + if value != 0x0F: + return value + while True: + if index >= len(data): + raise RoborockException("Truncated LZ4 block while reading length") + part = data[index] + index += 1 + value += part + if part != 0xFF: + return value + + while True: + if index >= len(data): + raise RoborockException("Truncated LZ4 block while reading token") + token = data[index] + index += 1 + + literal_length = read_length((token >> 4) & 0x0F) + end = index + literal_length + if end > len(data): + raise RoborockException("Truncated LZ4 block while reading literals") + output.extend(data[index:end]) + index = end + + if index == len(data): + return bytes(output) + if index + 2 > len(data): + raise RoborockException("Truncated LZ4 block while reading match offset") + + offset = data[index] | (data[index + 1] << 8) + index += 2 + if offset == 0 or offset > len(output): + raise RoborockException("Invalid LZ4 back-reference offset") + + match_length = read_length(token & 0x0F) + 4 + for _ in range(match_length): + output.append(output[-offset]) + + +def _infer_layout(decoded: bytes, width: int) -> tuple[int, bytes, bytes]: + """Split the inflated layout into (height, grid, room_data). + + The grid is ``width * height`` cells; the remaining bytes are room records + introduced by an ``01 `` marker. The room count is unknown up + front, so we search for the split that makes the grid rectangular and lines + up with the marker. + """ + for room_count in range(1, _MAX_ROOMS): + room_data_length = 2 + room_count * _ROOM_RECORD_LENGTH + area = len(decoded) - room_data_length + if area <= 0 or area % width: + continue + room_data = decoded[area:] + if room_data[0] == 1 and room_data[1] == room_count: + return area // width, decoded[:area], room_data + raise RoborockException("Could not infer Q10 layout dimensions / room records") + + +def _parse_rooms(room_data: bytes, grid: bytes) -> list[Q10Room]: + rooms: list[Q10Room] = [] + room_count = room_data[1] + for index in range(room_count): + start = 2 + index * _ROOM_RECORD_LENGTH + record = room_data[start : start + _ROOM_RECORD_LENGTH] + room_id = int.from_bytes(record[0:2], "big") + name_length = record[_ROOM_NAME_LENGTH_OFFSET] + raw_name = record[27 : 27 + name_length].decode("utf-8", errors="replace") + pixel_value = (room_id * 4) & 0xFF + rooms.append( + Q10Room( + id=room_id, + raw_name=raw_name, + pixel_value=pixel_value, + pixel_count=grid.count(pixel_value), + ) + ) + return rooms + + +def parse_map_packet(payload: bytes) -> Q10MapPacket: + """Parse a Q10 ``01 01`` map packet into grid + room metadata.""" + if len(payload) < _LAYOUT_COMPRESSED_OFFSET or not is_map_packet(payload): + raise RoborockException("Payload is not a Q10 map packet") + + map_id = int.from_bytes(payload[_MAP_ID_OFFSET : _MAP_ID_OFFSET + 4], "big") + width = int.from_bytes(payload[_WIDTH_OFFSET : _WIDTH_OFFSET + 2], "little") + if width <= 0: + raise RoborockException("Q10 map packet has invalid width") + + compressed_length = int.from_bytes( + payload[_COMPRESSED_LAYOUT_LENGTH_OFFSET : _COMPRESSED_LAYOUT_LENGTH_OFFSET + 2], "big" + ) + layout_end = _LAYOUT_COMPRESSED_OFFSET + compressed_length + if compressed_length <= 0 or layout_end > len(payload): + raise RoborockException("Q10 map packet has invalid layout block length") + + decoded = lz4_block_decompress(payload[_LAYOUT_COMPRESSED_OFFSET:layout_end]) + height, grid, room_data = _infer_layout(decoded, width) + rooms = _parse_rooms(room_data, grid) + return Q10MapPacket(map_id=map_id, width=width, height=height, grid=grid, rooms=rooms) + + +@dataclass +class B01Q10MapParserConfig: + """Configuration for the Q10 map parser.""" + + map_scale: int = 4 + """Scale factor for the rendered map image.""" + + +class B01Q10MapParser: + """Decoder/renderer for Q10 ``MAP_RESPONSE`` (protocol 301) payloads.""" + + def __init__(self, config: B01Q10MapParserConfig | None = None) -> None: + self._config = config or B01Q10MapParserConfig() + + def parse(self, payload: bytes) -> ParsedMapData: + """Parse a raw Q10 map packet into a rendered PNG + ``MapData``.""" + packet = parse_map_packet(payload) + image = self._render(packet) + + map_data = MapData() + map_data.image = ImageData( + size=packet.width * packet.height, + top=0, + left=0, + height=packet.height, + width=packet.width, + image_config=ImageConfig(scale=self._config.map_scale), + data=image, + img_transformation=lambda p: p, + ) + room_names = {room.id: room.name for room in packet.rooms} + if room_names: + map_data.additional_parameters["room_names"] = room_names + + image_bytes = io.BytesIO() + image.save(image_bytes, format=_MAP_FILE_FORMAT) + return ParsedMapData(image_content=image_bytes.getvalue(), map_data=map_data) + + def _render(self, packet: Q10MapPacket) -> Image.Image: + """Render the Q10 grid: rooms get distinct colors, walls white, rest dark.""" + palette = _build_palette(packet.grid) + rgb = bytearray() + for value in packet.grid: + rgb.extend(palette[value]) + img = Image.frombytes("RGB", (packet.width, packet.height), bytes(rgb)) + img = img.transpose(Image.Transpose.FLIP_TOP_BOTTOM) + scale = self._config.map_scale + if scale > 1: + img = img.resize((packet.width * scale, packet.height * scale), resample=Image.Resampling.NEAREST) + return img + + +def _build_palette(grid: bytes) -> list[tuple[int, int, int]]: + """Map each grid value to an RGB color (rooms distinct, walls white).""" + palette: list[tuple[int, int, int]] = [(28, 30, 38)] * 256 # default: unknown/outside + room_values = sorted({v for v in set(grid) if 0 < v < _WALL_THRESHOLD}) + for index, value in enumerate(room_values): + hue = (index * 0.139) % 1.0 + r, g, b = colorsys.hsv_to_rgb(hue, 0.5, 0.95) + palette[value] = (int(r * 255), int(g * 255), int(b * 255)) + for value in range(_WALL_THRESHOLD, 256): + palette[value] = (235, 235, 240) # walls / borders + palette[0] = (28, 30, 38) + return palette diff --git a/tests/devices/traits/b01/q10/test_map.py b/tests/devices/traits/b01/q10/test_map.py new file mode 100644 index 00000000..28c0bbea --- /dev/null +++ b/tests/devices/traits/b01/q10/test_map.py @@ -0,0 +1,67 @@ +"""Tests for the Q10 B01 map content trait.""" + +from pathlib import Path +from typing import cast + +import pytest + +from roborock.devices.traits.b01.q10.map import MapContentTrait +from roborock.devices.transport.mqtt_channel import MqttChannel +from roborock.exceptions import RoborockException +from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol +from tests.fixtures.channel_fixtures import FakeChannel + +FIXTURE = Path("tests/map/testdata/b01_q10_map.bin") + + +@pytest.fixture +def fake_channel() -> FakeChannel: + return FakeChannel() + + +def _trait(channel: FakeChannel) -> MapContentTrait: + return MapContentTrait(cast(MqttChannel, channel)) + + +def _map_message(payload: bytes) -> RoborockMessage: + return RoborockMessage( + protocol=RoborockMessageProtocol.MAP_RESPONSE, + payload=payload, + version=b"B01", + ) + + +async def test_map_refresh_populates_image_and_rooms(fake_channel: FakeChannel) -> None: + """refresh() triggers the device push, then parses the map payload.""" + payload = FIXTURE.read_bytes() + fake_channel.response_queue.append(_map_message(payload)) + + trait = _trait(fake_channel) + await trait.refresh() + + assert trait.raw_api_response == payload + assert trait.image_content is not None + assert trait.image_content[:8] == b"\x89PNG\r\n\x1a\n" + assert {room.id: room.name for room in trait.rooms} == {2: "Living Room", 3: "Bedroom"} + assert trait.map_data is not None + + # The refresh trigger is a dpRequestDps (code 102) request. + assert len(fake_channel.published_messages) == 1 + trigger = fake_channel.published_messages[0].payload + assert trigger is not None and b'"102"' in trigger + + +async def test_map_refresh_times_out_without_response( + fake_channel: FakeChannel, monkeypatch: pytest.MonkeyPatch +) -> None: + """If the device never pushes a map, refresh raises a clear error.""" + monkeypatch.setattr("roborock.devices.rpc.b01_q10_channel._MAP_TIMEOUT", 0.05) + trait = _trait(fake_channel) # no queued response -> times out + with pytest.raises(RoborockException, match="Timed out waiting for Q10 map"): + await trait.refresh() + + +def test_parse_without_refresh_raises(fake_channel: FakeChannel) -> None: + trait = _trait(fake_channel) + with pytest.raises(RoborockException, match="No map payload available"): + trait.parse_map_content() diff --git a/tests/map/test_b01_q10_map_parser.py b/tests/map/test_b01_q10_map_parser.py new file mode 100644 index 00000000..a3ab89b1 --- /dev/null +++ b/tests/map/test_b01_q10_map_parser.py @@ -0,0 +1,86 @@ +"""Tests for the Roborock Q10 (B01/ss07) map parser.""" + +from pathlib import Path + +import pytest + +from roborock.exceptions import RoborockException +from roborock.map.b01_q10_map_parser import ( + B01Q10MapParser, + Q10Room, + is_map_packet, + lz4_block_decompress, + parse_map_packet, +) + +FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_map.bin" + + +def _payload() -> bytes: + return FIXTURE.read_bytes() + + +def test_lz4_block_roundtrip_all_literals() -> None: + """A simple all-literals block decodes back to the original bytes.""" + original = bytes(range(60)) * 3 + block = bytearray() + block.append(0x0F << 4) + block.append(len(original) - 15) + block += original + assert lz4_block_decompress(bytes(block)) == original + + +def test_lz4_block_back_reference() -> None: + """Back-references expand runs (e.g. RLE-style repeats).""" + # seq1: 1 literal 'A', then match (offset 1, length 4+4=8) -> 'A' x9. + # seq2: final literals-only token (0 literals) ends the block per LZ4 spec. + block = bytes([0x14, ord("A"), 0x01, 0x00, 0x00]) + assert lz4_block_decompress(block) == b"A" * 9 + + +def test_is_map_packet() -> None: + assert is_map_packet(b"\x01\x01rest") + assert not is_map_packet(b"\x02\x01rest") # trace packet + assert not is_map_packet(b"") + + +def test_parse_map_packet() -> None: + packet = parse_map_packet(_payload()) + assert packet.width == 8 + assert packet.height == 6 + assert packet.map_id == 0x01020304 + assert len(packet.grid) == packet.width * packet.height + assert [(r.id, r.raw_name) for r in packet.rooms] == [(2, "rr_living_room"), (3, "bedroom")] + + +def test_room_name_normalization() -> None: + """Firmware ``rr_`` default names are normalized; custom names are titled.""" + assert Q10Room(id=2, raw_name="rr_living_room", pixel_value=8, pixel_count=9).name == "Living Room" + assert Q10Room(id=3, raw_name="bedroom", pixel_value=12, pixel_count=9).name == "Bedroom" + + +def test_room_pixel_count_matches_grid() -> None: + packet = parse_map_packet(_payload()) + for room in packet.rooms: + assert room.pixel_value == (room.id * 4) & 0xFF + assert room.pixel_count == packet.grid.count(room.pixel_value) + + +def test_parser_renders_png_and_room_names() -> None: + parsed = B01Q10MapParser().parse(_payload()) + assert parsed.image_content is not None + assert parsed.image_content[:8] == b"\x89PNG\r\n\x1a\n" # PNG magic + assert parsed.map_data is not None + assert parsed.map_data.additional_parameters["room_names"] == {2: "Living Room", 3: "Bedroom"} + + +def test_parse_rejects_non_map_packet() -> None: + with pytest.raises(RoborockException, match="not a Q10 map packet"): + parse_map_packet(b"\x02\x01" + b"\x00" * 40) + + +def test_parse_rejects_bad_layout_length() -> None: + payload = bytearray(_payload()) + payload[27:29] = (0xFFFF).to_bytes(2, "big") # compressed length past the buffer + with pytest.raises(RoborockException, match="invalid layout block length"): + parse_map_packet(bytes(payload)) diff --git a/tests/map/testdata/b01_q10_map.bin b/tests/map/testdata/b01_q10_map.bin new file mode 100644 index 0000000000000000000000000000000000000000..05ca70835a07073c39d107a0def01ed65a32f47b GIT binary patch literal 175 zcmZQ%WMpDyVPN23zyy;%G=6{r1`ZAm9v&W`Iy4$h7b6n`6BC*kjLTP46rYn>mYJ6x YUzDGp3l(N&hRLJR>`AF9Na`T607st_J^%m! literal 0 HcmV?d00001 From 1c4353efe87e0070d978454ed1df63860da4b27f Mon Sep 17 00:00:00 2001 From: Vincent <2070309+tubededentifrice@users.noreply.github.com> Date: Sun, 14 Jun 2026 11:42:21 +0400 Subject: [PATCH 2/6] feat: add Q10 live position parsing from 02 01 packets Adds parsing for the Q10 "02 01" live position packet (delivered on the same protocol-301 channel as the map, only while the robot is moving). The packet format was reverse-engineered and validated against live ss07 captures (the 18-byte-header layout documented elsewhere did NOT match this firmware): - 10-byte header (sequence counter at byte 3, then a constant type/flag). - big-endian int16 (x, y) point pairs; this firmware sends the current position as a single point per packet rather than an accumulated path. - Confirmed live: as R1 traversed the corridor, the decoded x moved from -163 to +169 with y ~0. The full saved map packet (01 01) was checked too and does NOT carry the live path (identical across captures during a clean), so position comes from 02 01. - b01_q10_map_parser: parse_trace_packet() + Q10TracePacket/Q10Point. - b01_q10_channel: request_trace() (marker-filtered). - MapContentTrait.refresh_trace() exposes path + robot_position. - cli: `q10-position` (reports gracefully when the robot is idle). - Tests use a real captured position packet + a synthetic multi-point packet. --- roborock/cli.py | 33 +++++++++++ roborock/devices/rpc/b01_q10_channel.py | 41 +++++++++++--- roborock/devices/traits/b01/q10/map.py | 28 +++++++++- roborock/map/b01_q10_map_parser.py | 62 +++++++++++++++++++++ tests/devices/traits/b01/q10/test_map.py | 22 ++++++++ tests/map/test_b01_q10_map_parser.py | 46 +++++++++++++++ tests/map/testdata/b01_q10_trace.bin | Bin 0 -> 14 bytes tests/map/testdata/b01_q10_trace_multi.bin | Bin 0 -> 22 bytes 8 files changed, 221 insertions(+), 11 deletions(-) create mode 100644 tests/map/testdata/b01_q10_trace.bin create mode 100644 tests/map/testdata/b01_q10_trace_multi.bin diff --git a/roborock/cli.py b/roborock/cli.py index 1ca8ab11..2a59e1c6 100644 --- a/roborock/cli.py +++ b/roborock/cli.py @@ -572,6 +572,39 @@ async def map_data(ctx, device_id: str, include_path: bool): click.echo(dump_json(data_summary)) +@session.command() +@click.option("--device_id", required=True) +@click.option("--include_path", is_flag=True, default=False, help="Include all path points in the output.") +@click.pass_context +@async_command +async def q10_position(ctx, device_id: str, include_path: bool): + """Get the current Q10 robot position and live cleaning path. + + The Q10 only streams its position/path while it is actively cleaning, so this + will report that no live trace is available for an idle/docked robot. + """ + context: RoborockContext = ctx.obj + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is None: + click.echo("Feature not supported by device") + return + map_trait = device.b01_q10_properties.map + try: + await map_trait.refresh_trace() + except RoborockException: + click.echo("No live trace available (the robot only reports position while cleaning).") + return + position = map_trait.robot_position + summary: dict[str, Any] = { + "robot_position": {"x": position.x, "y": position.y} if position else None, + "path_points": len(map_trait.path), + } + if include_path: + summary["path"] = [[p.x, p.y] for p in map_trait.path] + click.echo(dump_json(summary)) + + @session.command() @click.option("--device_id", required=True) @click.pass_context diff --git a/roborock/devices/rpc/b01_q10_channel.py b/roborock/devices/rpc/b01_q10_channel.py index 79cb5dde..a2ddd212 100644 --- a/roborock/devices/rpc/b01_q10_channel.py +++ b/roborock/devices/rpc/b01_q10_channel.py @@ -41,14 +41,14 @@ async def stream_decoded_responses( yield decoded_dps -async def request_map(mqtt_channel: MqttChannel, *, timeout: float | None = None) -> bytes: - """Request the current map and return the raw ``MAP_RESPONSE`` payload. +# MAP_RESPONSE (protocol 301) payloads start with a 2-byte marker identifying the +# packet kind: a full map (``01 01``) or a live trace/path (``02 01``). +_MAP_PACKET_MARKER = b"\x01\x01" +_TRACE_PACKET_MARKER = b"\x02\x01" - The Q10 does not have a dedicated "get map" command. Instead, requesting the - device state (``dpRequestDps``) triggers the robot to push its current map as - a ``MAP_RESPONSE`` (protocol 301) message shortly afterwards. This subscribes - for that push, sends the trigger, and resolves on the first map message. - """ + +async def _request_map_response(mqtt_channel: MqttChannel, marker: bytes, what: str, timeout: float | None) -> bytes: + """Trigger a map push and resolve on the first ``MAP_RESPONSE`` with ``marker``.""" if timeout is None: timeout = _MAP_TIMEOUT loop = asyncio.get_running_loop() @@ -57,7 +57,11 @@ async def request_map(mqtt_channel: MqttChannel, *, timeout: float | None = None def on_message(message: RoborockMessage) -> None: if future.done(): return - if message.protocol == RoborockMessageProtocol.MAP_RESPONSE and message.payload: + if ( + message.protocol == RoborockMessageProtocol.MAP_RESPONSE + and message.payload + and message.payload[:2] == marker + ): future.set_result(message.payload) unsub = await mqtt_channel.subscribe(on_message) @@ -65,11 +69,30 @@ def on_message(message: RoborockMessage) -> None: await send_command(mqtt_channel, B01_Q10_DP.REQUEST_DPS, {}) return await asyncio.wait_for(future, timeout=timeout) except TimeoutError as ex: - raise RoborockException(f"Timed out waiting for Q10 map after {timeout}s") from ex + raise RoborockException(f"Timed out waiting for Q10 {what} after {timeout}s") from ex finally: unsub() +async def request_map(mqtt_channel: MqttChannel, *, timeout: float | None = None) -> bytes: + """Request the current map and return the raw ``01 01`` ``MAP_RESPONSE`` payload. + + The Q10 does not have a dedicated "get map" command. Instead, requesting the + device state (``dpRequestDps``) triggers the robot to push its current map as + a ``MAP_RESPONSE`` (protocol 301) message shortly afterwards. + """ + return await _request_map_response(mqtt_channel, _MAP_PACKET_MARKER, "map", timeout) + + +async def request_trace(mqtt_channel: MqttChannel, *, timeout: float | None = None) -> bytes: + """Request the live trace/path and return the raw ``02 01`` ``MAP_RESPONSE`` payload. + + The robot only emits trace packets while it is actively moving (cleaning), so + this will time out for an idle/docked robot. + """ + return await _request_map_response(mqtt_channel, _TRACE_PACKET_MARKER, "trace", timeout) + + async def send_command( mqtt_channel: MqttChannel, command: B01_Q10_DP, diff --git a/roborock/devices/traits/b01/q10/map.py b/roborock/devices/traits/b01/q10/map.py index fdd8fcca..6daf2ab3 100644 --- a/roborock/devices/traits/b01/q10/map.py +++ b/roborock/devices/traits/b01/q10/map.py @@ -15,11 +15,18 @@ from vacuum_map_parser_base.map_data import MapData from roborock.data import RoborockBase -from roborock.devices.rpc.b01_q10_channel import request_map +from roborock.devices.rpc.b01_q10_channel import request_map, request_trace from roborock.devices.traits import Trait from roborock.devices.transport.mqtt_channel import MqttChannel from roborock.exceptions import RoborockException -from roborock.map.b01_q10_map_parser import B01Q10MapParser, B01Q10MapParserConfig, Q10Room, parse_map_packet +from roborock.map.b01_q10_map_parser import ( + B01Q10MapParser, + B01Q10MapParserConfig, + Q10Point, + Q10Room, + parse_map_packet, + parse_trace_packet, +) _TRUNCATE_LENGTH = 20 @@ -37,6 +44,12 @@ class MapContent(RoborockBase): rooms: list[Q10Room] = field(default_factory=list) """Rooms (segments) reported by the device, with ids and names.""" + path: list[Q10Point] = field(default_factory=list) + """Latest live cleaning path points (only available while the robot moves).""" + + robot_position: Q10Point | None = None + """Current robot position (the most recent trace point), if known.""" + raw_api_response: bytes | None = None """Raw bytes of the map payload from the device (opaque blob for re-parsing).""" @@ -85,3 +98,14 @@ def parse_map_content(self) -> None: self.image_content = parsed.image_content self.map_data = parsed.map_data self.rooms = packet.rooms + + async def refresh_trace(self) -> None: + """Fetch the live cleaning path and current robot position. + + The robot only emits trace packets while it is actively moving, so this + raises :class:`RoborockException` (timeout) for an idle/docked robot. + """ + raw_payload = await request_trace(self._channel) + trace = parse_trace_packet(raw_payload) + self.path = trace.points + self.robot_position = trace.robot_position diff --git a/roborock/map/b01_q10_map_parser.py b/roborock/map/b01_q10_map_parser.py index bb747959..b2f68b70 100644 --- a/roborock/map/b01_q10_map_parser.py +++ b/roborock/map/b01_q10_map_parser.py @@ -73,11 +73,73 @@ class Q10MapPacket: rooms: list[Q10Room] = field(default_factory=list) +@dataclass +class Q10Point: + """A single point in Q10 map/trace coordinate space.""" + + x: int + y: int + + +@dataclass +class Q10TracePacket: + """Decoded contents of a Q10 ``02 01`` live position packet. + + The robot only emits these while it is actively moving (cleaning), so an + idle/docked robot will not produce them. Observed firmware sends the current + position as a single point per packet rather than an accumulated path, so + ``points`` typically holds one point. + """ + + points: list[Q10Point] = field(default_factory=list) + sequence: int = 0 + """Header sequence counter; increments as new position packets are sent.""" + + @property + def robot_position(self) -> Q10Point | None: + """The current robot position (the most recent point).""" + return self.points[-1] if self.points else None + + +# Trace packet (``02 01``): a 10-byte header followed by big-endian int16 (x, y) +# point pairs. Header layout was confirmed against live ss07 captures (the +# sequence counter is at byte 3; bytes 4-9 are a constant type/flag + padding). +# NOTE: the format documented by roborock-qseries-map-bridge (18-byte header) +# did not match this firmware -- this 10-byte layout is what the device sent. +_TRACE_HEADER_LENGTH = 10 +_TRACE_SEQUENCE_OFFSET = 3 + + def is_map_packet(payload: bytes) -> bool: """Return True if the payload is a Q10 full-map (``01 01``) packet.""" return payload[:2] == MAP_PACKET_MARKER +def is_trace_packet(payload: bytes) -> bool: + """Return True if the payload is a Q10 live trace (``02 01``) packet.""" + return payload[:2] == TRACE_PACKET_MARKER + + +def parse_trace_packet(payload: bytes) -> Q10TracePacket: + """Parse a Q10 ``02 01`` trace packet into path points + robot position.""" + if not is_trace_packet(payload): + raise RoborockException("Payload is not a Q10 trace packet") + if len(payload) < _TRACE_HEADER_LENGTH: + raise RoborockException("Q10 trace packet is shorter than its header") + body = payload[_TRACE_HEADER_LENGTH:] + if len(body) % 4: + raise RoborockException("Q10 trace points are not 4-byte (x, y) pairs") + + points = [ + Q10Point( + x=int.from_bytes(body[offset : offset + 2], "big", signed=True), + y=int.from_bytes(body[offset + 2 : offset + 4], "big", signed=True), + ) + for offset in range(0, len(body), 4) + ] + return Q10TracePacket(points=points, sequence=payload[_TRACE_SEQUENCE_OFFSET]) + + def lz4_block_decompress(data: bytes) -> bytes: """Decompress a raw LZ4 *block* (no frame header). diff --git a/tests/devices/traits/b01/q10/test_map.py b/tests/devices/traits/b01/q10/test_map.py index 28c0bbea..e9d47fdf 100644 --- a/tests/devices/traits/b01/q10/test_map.py +++ b/tests/devices/traits/b01/q10/test_map.py @@ -12,6 +12,7 @@ from tests.fixtures.channel_fixtures import FakeChannel FIXTURE = Path("tests/map/testdata/b01_q10_map.bin") +TRACE_FIXTURE = Path("tests/map/testdata/b01_q10_trace.bin") @pytest.fixture @@ -65,3 +66,24 @@ def test_parse_without_refresh_raises(fake_channel: FakeChannel) -> None: trait = _trait(fake_channel) with pytest.raises(RoborockException, match="No map payload available"): trait.parse_map_content() + + +async def test_refresh_trace_populates_path_and_position(fake_channel: FakeChannel) -> None: + """refresh_trace() parses the live position from a real ss07 trace packet.""" + fake_channel.response_queue.append(_map_message(TRACE_FIXTURE.read_bytes())) + + trait = _trait(fake_channel) + await trait.refresh_trace() + + assert [(p.x, p.y) for p in trait.path] == [(169, 0)] + assert trait.robot_position is not None + assert (trait.robot_position.x, trait.robot_position.y) == (169, 0) + + +async def test_refresh_trace_ignores_map_packets(fake_channel: FakeChannel, monkeypatch: pytest.MonkeyPatch) -> None: + """A map (01 01) push must not satisfy a trace request.""" + monkeypatch.setattr("roborock.devices.rpc.b01_q10_channel._MAP_TIMEOUT", 0.05) + fake_channel.response_queue.append(_map_message(FIXTURE.read_bytes())) # map, not trace + trait = _trait(fake_channel) + with pytest.raises(RoborockException, match="Timed out waiting for Q10 trace"): + await trait.refresh_trace() diff --git a/tests/map/test_b01_q10_map_parser.py b/tests/map/test_b01_q10_map_parser.py index a3ab89b1..1c4e8a61 100644 --- a/tests/map/test_b01_q10_map_parser.py +++ b/tests/map/test_b01_q10_map_parser.py @@ -9,11 +9,15 @@ B01Q10MapParser, Q10Room, is_map_packet, + is_trace_packet, lz4_block_decompress, parse_map_packet, + parse_trace_packet, ) FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_map.bin" +TRACE_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace.bin" +TRACE_MULTI_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace_multi.bin" def _payload() -> bytes: @@ -79,6 +83,48 @@ def test_parse_rejects_non_map_packet() -> None: parse_map_packet(b"\x02\x01" + b"\x00" * 40) +def test_packet_markers_are_distinct() -> None: + map_payload = _payload() + trace_payload = TRACE_FIXTURE.read_bytes() + assert is_map_packet(map_payload) and not is_trace_packet(map_payload) + assert is_trace_packet(trace_payload) and not is_map_packet(trace_payload) + + +def test_parse_trace_packet_real_single_point() -> None: + """A real ss07 position packet decodes to a single current-position point.""" + trace = parse_trace_packet(TRACE_FIXTURE.read_bytes()) + assert trace.sequence == 9 + assert [(p.x, p.y) for p in trace.points] == [(169, 0)] + assert trace.robot_position is not None + assert (trace.robot_position.x, trace.robot_position.y) == (169, 0) + + +def test_parse_trace_packet_multi_point() -> None: + """A multi-point packet decodes all points; position is the most recent.""" + trace = parse_trace_packet(TRACE_MULTI_FIXTURE.read_bytes()) + assert [(p.x, p.y) for p in trace.points] == [(100, 200), (150, 250), (-50, 300)] + # Signed coordinates are supported (negative x). + assert trace.robot_position is not None + assert (trace.robot_position.x, trace.robot_position.y) == (-50, 300) + + +def test_parse_trace_empty_path_has_no_position() -> None: + header_only = b"\x02\x01" + b"\x00" * 8 # 10-byte header, no points + trace = parse_trace_packet(header_only) + assert trace.points == [] + assert trace.robot_position is None + + +def test_parse_trace_rejects_non_trace_packet() -> None: + with pytest.raises(RoborockException, match="not a Q10 trace packet"): + parse_trace_packet(_payload()) + + +def test_parse_trace_rejects_misaligned_points() -> None: + with pytest.raises(RoborockException, match="not 4-byte"): + parse_trace_packet(b"\x02\x01" + b"\x00" * 8 + b"\x01\x02\x03") + + def test_parse_rejects_bad_layout_length() -> None: payload = bytearray(_payload()) payload[27:29] = (0xFFFF).to_bytes(2, "big") # compressed length past the buffer diff --git a/tests/map/testdata/b01_q10_trace.bin b/tests/map/testdata/b01_q10_trace.bin new file mode 100644 index 0000000000000000000000000000000000000000..ace261d0907cb56b8c991d10c1f2d36eb6faf38b GIT binary patch literal 14 ScmZQ#WZ-0AVgP}a3=9AQrvSJB literal 0 HcmV?d00001 diff --git a/tests/map/testdata/b01_q10_trace_multi.bin b/tests/map/testdata/b01_q10_trace_multi.bin new file mode 100644 index 0000000000000000000000000000000000000000..8377e6c0d2b46c02bf0934e59c4c997027d424b3 GIT binary patch literal 22 bcmZQ#WME}rVgP{@h7%0a7=Haf$EX7U8@vR; literal 0 HcmV?d00001 From 57f263918e13eb194592c2434907f28e329046e0 Mon Sep 17 00:00:00 2001 From: Vincent <2070309+tubededentifrice@users.noreply.github.com> Date: Sun, 14 Jun 2026 12:51:28 +0400 Subject: [PATCH 3/6] fix: frame Q10 02 01 trace as full cleaning-session path Live capture (R1 corridor run) disproved the earlier 'single current point per packet' assumption: the same session emitted packets of 1, then 3, then 15 points, each a strict superset. The robot accumulates the full session path server-side and returns it whole, so a client connecting mid-session still gets the complete trail (matching the app showing it after a cold launch). The parser already read all points; this corrects the docs and adds a real 15-point fixture + test, and clarifies that byte 3 is a session counter (tracks the device clean count) not a per-packet sequence. --- roborock/devices/traits/b01/q10/map.py | 20 +++++++++---- roborock/map/b01_q10_map_parser.py | 28 +++++++++++++------ tests/map/test_b01_q10_map_parser.py | 24 +++++++++++++++- tests/map/testdata/b01_q10_trace_session.bin | Bin 0 -> 70 bytes 4 files changed, 56 insertions(+), 16 deletions(-) create mode 100644 tests/map/testdata/b01_q10_trace_session.bin diff --git a/roborock/devices/traits/b01/q10/map.py b/roborock/devices/traits/b01/q10/map.py index 6daf2ab3..07f5c24b 100644 --- a/roborock/devices/traits/b01/q10/map.py +++ b/roborock/devices/traits/b01/q10/map.py @@ -45,10 +45,14 @@ class MapContent(RoborockBase): """Rooms (segments) reported by the device, with ids and names.""" path: list[Q10Point] = field(default_factory=list) - """Latest live cleaning path points (only available while the robot moves).""" + """Full path of the current cleaning session (oldest point first). + + The robot accumulates this server-side and serves the whole trajectory so + far in one packet, so it is complete even if we connect mid-session. Only + populated while a cleaning session is active.""" robot_position: Q10Point | None = None - """Current robot position (the most recent trace point), if known.""" + """Current robot position (the most recent path point), if known.""" raw_api_response: bytes | None = None """Raw bytes of the map payload from the device (opaque blob for re-parsing).""" @@ -100,10 +104,14 @@ def parse_map_content(self) -> None: self.rooms = packet.rooms async def refresh_trace(self) -> None: - """Fetch the live cleaning path and current robot position. - - The robot only emits trace packets while it is actively moving, so this - raises :class:`RoborockException` (timeout) for an idle/docked robot. + """Fetch the current session's cleaning path and robot position. + + Populates :attr:`path` with the full trajectory of the active cleaning + session (the robot accumulates it, so the whole path is returned even + when we connect mid-session) and :attr:`robot_position` with the most + recent point. The robot only emits trace packets while a session is + active, so this raises :class:`RoborockException` (timeout) for an + idle/docked robot. """ raw_payload = await request_trace(self._channel) trace = parse_trace_packet(raw_payload) diff --git a/roborock/map/b01_q10_map_parser.py b/roborock/map/b01_q10_map_parser.py index b2f68b70..346c53d2 100644 --- a/roborock/map/b01_q10_map_parser.py +++ b/roborock/map/b01_q10_map_parser.py @@ -83,17 +83,24 @@ class Q10Point: @dataclass class Q10TracePacket: - """Decoded contents of a Q10 ``02 01`` live position packet. - - The robot only emits these while it is actively moving (cleaning), so an - idle/docked robot will not produce them. Observed firmware sends the current - position as a single point per packet rather than an accumulated path, so - ``points`` typically holds one point. + """Decoded contents of a Q10 ``02 01`` cleaning-path packet. + + The robot accumulates the **full path of the current cleaning session** and + serves it in a single packet: ``points`` holds the whole trajectory so far + (oldest first), growing as the robot cleans. This was confirmed live -- a + corridor run produced packets of 1, then 3, then 15 points, each a strict + superset describing the path travelled. Because the robot keeps the path + server-side, a client that connects mid-session still receives the complete + path (this is how the app shows the trail even after a cold launch). + + The robot only emits these while a session is active, so an idle/docked robot + will not produce them. The most recent point is the current robot position. """ points: list[Q10Point] = field(default_factory=list) sequence: int = 0 - """Header sequence counter; increments as new position packets are sent.""" + """Session counter (byte 3); increments per cleaning session, tracking the + device clean count. Not a per-packet sequence.""" @property def robot_position(self) -> Q10Point | None: @@ -102,8 +109,11 @@ def robot_position(self) -> Q10Point | None: # Trace packet (``02 01``): a 10-byte header followed by big-endian int16 (x, y) -# point pairs. Header layout was confirmed against live ss07 captures (the -# sequence counter is at byte 3; bytes 4-9 are a constant type/flag + padding). +# point pairs forming the accumulated session path. Header layout confirmed +# against live ss07 captures: byte 3 is a session counter (tracks the device +# clean count); bytes 8-9 are a u16be point count minus one (verified: a 15-point +# packet carried 0x000e == 14). The parser reads all 4-byte pairs in the body +# rather than trusting the count field, so a truncated tail can't desync it. # NOTE: the format documented by roborock-qseries-map-bridge (18-byte header) # did not match this firmware -- this 10-byte layout is what the device sent. _TRACE_HEADER_LENGTH = 10 diff --git a/tests/map/test_b01_q10_map_parser.py b/tests/map/test_b01_q10_map_parser.py index 1c4e8a61..d40a158c 100644 --- a/tests/map/test_b01_q10_map_parser.py +++ b/tests/map/test_b01_q10_map_parser.py @@ -18,6 +18,8 @@ FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_map.bin" TRACE_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace.bin" TRACE_MULTI_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace_multi.bin" +# Real 15-point packet captured from an R1 corridor run (full session path). +TRACE_SESSION_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace_session.bin" def _payload() -> bytes: @@ -91,7 +93,7 @@ def test_packet_markers_are_distinct() -> None: def test_parse_trace_packet_real_single_point() -> None: - """A real ss07 position packet decodes to a single current-position point.""" + """A real ss07 packet captured early in a session has a single path point.""" trace = parse_trace_packet(TRACE_FIXTURE.read_bytes()) assert trace.sequence == 9 assert [(p.x, p.y) for p in trace.points] == [(169, 0)] @@ -99,6 +101,26 @@ def test_parse_trace_packet_real_single_point() -> None: assert (trace.robot_position.x, trace.robot_position.y) == (169, 0) +def test_parse_trace_packet_real_session_path() -> None: + """A real 15-point packet (corridor run) decodes the full accumulated path. + + Captured live from an R1: the same session emitted packets of 1, then 3, + then 15 points, proving the path accumulates rather than reporting only the + current position. The most recent point is the current robot position. + """ + trace = parse_trace_packet(TRACE_SESSION_FIXTURE.read_bytes()) + points = [(p.x, p.y) for p in trace.points] + assert len(points) == 15 + assert points[0] == (-34, 0) # oldest + assert points[-1] == (276, -1) # most recent == current position + # After the initial repositioning, x marches steadily down the corridor. + tail_x = [p[0] for p in points[2:]] + assert tail_x == sorted(tail_x) + assert points[-1][0] - points[0][0] > 300 # spans the corridor + assert trace.robot_position is not None + assert (trace.robot_position.x, trace.robot_position.y) == (276, -1) + + def test_parse_trace_packet_multi_point() -> None: """A multi-point packet decodes all points; position is the most recent.""" trace = parse_trace_packet(TRACE_MULTI_FIXTURE.read_bytes()) diff --git a/tests/map/testdata/b01_q10_trace_session.bin b/tests/map/testdata/b01_q10_trace_session.bin new file mode 100644 index 0000000000000000000000000000000000000000..a4222e8170b649884537705401fc896ff9289d0d GIT binary patch literal 70 zcmZQ#WZ+_8Vqjq4`+pBeYBD%5h%zWJXfP-PS;`C%4Dt*)4AKk@4B`wkfOrjqFvAfB Ueuf)B{DFa+k%57kQRM%B069hr761SM literal 0 HcmV?d00001 From d45d488bb264c214dfbe02a863a34a555151863a Mon Sep 17 00:00:00 2001 From: Vincent <2070309+tubededentifrice@users.noreply.github.com> Date: Sun, 14 Jun 2026 17:11:34 +0400 Subject: [PATCH 4/6] fix: allow Q10 maps without room records --- roborock/map/b01_q10_map_parser.py | 2 +- tests/map/test_b01_q10_map_parser.py | 37 ++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/roborock/map/b01_q10_map_parser.py b/roborock/map/b01_q10_map_parser.py index 346c53d2..a56e9024 100644 --- a/roborock/map/b01_q10_map_parser.py +++ b/roborock/map/b01_q10_map_parser.py @@ -208,7 +208,7 @@ def _infer_layout(decoded: bytes, width: int) -> tuple[int, bytes, bytes]: front, so we search for the split that makes the grid rectangular and lines up with the marker. """ - for room_count in range(1, _MAX_ROOMS): + for room_count in range(0, _MAX_ROOMS + 1): room_data_length = 2 + room_count * _ROOM_RECORD_LENGTH area = len(decoded) - room_data_length if area <= 0 or area % width: diff --git a/tests/map/test_b01_q10_map_parser.py b/tests/map/test_b01_q10_map_parser.py index d40a158c..4dbcc9bf 100644 --- a/tests/map/test_b01_q10_map_parser.py +++ b/tests/map/test_b01_q10_map_parser.py @@ -26,6 +26,33 @@ def _payload() -> bytes: return FIXTURE.read_bytes() +def _literal_lz4_block(data: bytes) -> bytes: + block = bytearray() + literal_length = len(data) + if literal_length < 15: + block.append(literal_length << 4) + else: + block.append(0xF0) + remaining = literal_length - 15 + while remaining >= 0xFF: + block.append(0xFF) + remaining -= 0xFF + block.append(remaining) + block.extend(data) + return bytes(block) + + +def _synthetic_map_payload(width: int, decoded_layout: bytes) -> bytes: + compressed = _literal_lz4_block(decoded_layout) + payload = bytearray(29) + payload[0:2] = b"\x01\x01" + payload[2:6] = (0x01020304).to_bytes(4, "big") + payload[8:10] = width.to_bytes(2, "little") + payload[27:29] = len(compressed).to_bytes(2, "big") + payload.extend(compressed) + return bytes(payload) + + def test_lz4_block_roundtrip_all_literals() -> None: """A simple all-literals block decodes back to the original bytes.""" original = bytes(range(60)) * 3 @@ -59,6 +86,16 @@ def test_parse_map_packet() -> None: assert [(r.id, r.raw_name) for r in packet.rooms] == [(2, "rr_living_room"), (3, "bedroom")] +def test_parse_map_packet_allows_zero_room_metadata() -> None: + """A map can be present before the robot has room segmentation records.""" + grid = bytes([240, 240, 249, 243, 240, 240]) + packet = parse_map_packet(_synthetic_map_payload(width=3, decoded_layout=grid + b"\x01\x00")) + assert packet.width == 3 + assert packet.height == 2 + assert packet.grid == grid + assert packet.rooms == [] + + def test_room_name_normalization() -> None: """Firmware ``rr_`` default names are normalized; custom names are titled.""" assert Q10Room(id=2, raw_name="rr_living_room", pixel_value=8, pixel_count=9).name == "Living Room" From e6feafc3d345f9fe3f737e0dbbcafd578d04df93 Mon Sep 17 00:00:00 2001 From: Vincent <2070309+tubededentifrice@users.noreply.github.com> Date: Mon, 15 Jun 2026 10:49:18 +0400 Subject: [PATCH 5/6] refactor: make Q10 map support fully push-driven The Q10 has no synchronous get-map command. The previous MapContentTrait faked one: refresh()/refresh_trace() sent a dpRequestDps and blocked awaiting the next MAP_RESPONSE push with a timeout. That has no request/response correlation and fights the firmware's ~60-70s push throttle. Mirror the existing Q10 StatusTrait model instead: - MapContentTrait is now a push-only TraitUpdateListener. The Q10PropertiesApi subscribe loop routes protocol-301 MAP_RESPONSE packets to update_from_map_response(), which parses the payload, updates the cached fields and notifies listeners. - Drop request_map()/request_trace() and the trait's refresh()/refresh_trace(). - CLI map-image/rooms/q10-position now nudge the device with refresh() and wait on a map-trait update listener for the pushed data. --- roborock/cli.py | 63 ++++++-- roborock/devices/rpc/b01_q10_channel.py | 66 +------- roborock/devices/traits/b01/q10/__init__.py | 40 +++-- roborock/devices/traits/b01/q10/map.py | 97 +++++++----- tests/devices/traits/b01/q10/test_map.py | 157 +++++++++++++------- 5 files changed, 256 insertions(+), 167 deletions(-) diff --git a/roborock/cli.py b/roborock/cli.py index 2a59e1c6..013719da 100644 --- a/roborock/cli.py +++ b/roborock/cli.py @@ -50,6 +50,7 @@ from roborock.devices.device import RoborockDevice from roborock.devices.device_manager import DeviceManager, UserParams, create_device_manager from roborock.devices.traits import Trait +from roborock.devices.traits.b01.q10 import Q10PropertiesApi from roborock.devices.traits.b01.q10.vacuum import VacuumTrait from roborock.devices.traits.v1 import V1TraitMixin from roborock.devices.traits.v1.consumeable import ConsumableAttribute @@ -521,6 +522,46 @@ async def maps(ctx, device_id: str): await _display_v1_trait(context, device_id, lambda v1: v1.maps) +# The Q10 pushes its map ~9s after a dpRequestDps; firmware throttles pushes to +# ~once per 60-70s, so a single request is answered quickly but rapid re-requests +# may not be. This bounds how long a one-shot CLI command waits for that push. +_Q10_MAP_PUSH_TIMEOUT = 30.0 + + +async def _await_q10_map_push( + properties: Q10PropertiesApi, + predicate: Callable[[], bool], + *, + timeout: float = _Q10_MAP_PUSH_TIMEOUT, +) -> bool: + """Nudge a Q10 to push its map/trace and wait until ``predicate`` holds. + + The Q10 map API is entirely push-driven: there is no synchronous get-map + request. A ``dpRequestDps`` causes the device to publish a ``MAP_RESPONSE``, + which the device's subscribe loop feeds into the map trait. Here we register + an update listener, send the request, and wait for the pushed data to satisfy + ``predicate``. Returns whether it did within ``timeout``. + """ + if predicate(): + return True + loop = asyncio.get_running_loop() + updated: asyncio.Future[None] = loop.create_future() + + def on_update() -> None: + if predicate() and not updated.done(): + updated.set_result(None) + + unsub = properties.map.add_update_listener(on_update) + try: + await properties.refresh() + await asyncio.wait_for(updated, timeout=timeout) + return True + except TimeoutError: + return False + finally: + unsub() + + @session.command() @click.option("--device_id", required=True) @click.option("--output-file", required=True, help="Path to save the map image.") @@ -532,9 +573,9 @@ async def map_image(ctx, device_id: str, output_file: str): device_manager = await context.get_device_manager() device = await device_manager.get_device(device_id) if device.b01_q10_properties is not None: - map_trait = device.b01_q10_properties.map - await map_trait.refresh() - image_content = map_trait.image_content + properties = device.b01_q10_properties + await _await_q10_map_push(properties, lambda: properties.map.image_content is not None) + image_content = properties.map.image_content else: v1_trait: MapContentTrait = await _v1_trait(context, device_id, lambda v1: v1.map_content) image_content = v1_trait.image_content @@ -589,12 +630,12 @@ async def q10_position(ctx, device_id: str, include_path: bool): if device.b01_q10_properties is None: click.echo("Feature not supported by device") return - map_trait = device.b01_q10_properties.map - try: - await map_trait.refresh_trace() - except RoborockException: + properties = device.b01_q10_properties + got_trace = await _await_q10_map_push(properties, lambda: bool(properties.map.path)) + if not got_trace: click.echo("No live trace available (the robot only reports position while cleaning).") return + map_trait = properties.map position = map_trait.robot_position summary: dict[str, Any] = { "robot_position": {"x": position.x, "y": position.y} if position else None, @@ -749,9 +790,11 @@ async def rooms(ctx, device_id: str): device_manager = await context.get_device_manager() device = await device_manager.get_device(device_id) if device.b01_q10_properties is not None: - map_trait = device.b01_q10_properties.map - await map_trait.refresh() - click.echo(dump_json({room.id: room.name for room in map_trait.rooms})) + properties = device.b01_q10_properties + # A valid map may have no room records, so wait on the map arriving + # (image_content) rather than on rooms being non-empty. + await _await_q10_map_push(properties, lambda: properties.map.image_content is not None) + click.echo(dump_json({room.id: room.name for room in properties.map.rooms})) else: await _display_v1_trait(context, device_id, lambda v1: v1.rooms) diff --git a/roborock/devices/rpc/b01_q10_channel.py b/roborock/devices/rpc/b01_q10_channel.py index a2ddd212..50ec8e5a 100644 --- a/roborock/devices/rpc/b01_q10_channel.py +++ b/roborock/devices/rpc/b01_q10_channel.py @@ -1,6 +1,5 @@ """Thin wrapper around the MQTT channel for Roborock B01 Q10 devices.""" -import asyncio import logging from collections.abc import AsyncGenerator from typing import Any @@ -13,20 +12,19 @@ decode_rpc_response, encode_mqtt_payload, ) -from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol _LOGGER = logging.getLogger(__name__) -# Requesting the device state (dpRequestDps) also makes the robot push its current -# map as a separate MAP_RESPONSE message a few seconds later. Q10 firmware throttles -# these pushes (~60-70s between maps), so callers should not poll tightly. -_MAP_TIMEOUT = 20.0 - async def stream_decoded_responses( mqtt_channel: MqttChannel, ) -> AsyncGenerator[dict[B01_Q10_DP, Any], None]: - """Stream decoded DPS messages received via MQTT.""" + """Stream decoded DPS messages received via MQTT. + + Messages that are not decodable DPS responses (e.g. protocol-301 + ``MAP_RESPONSE`` map pushes) are skipped; callers that need the raw + messages should subscribe to :meth:`MqttChannel.subscribe_stream` directly. + """ async for response_message in mqtt_channel.subscribe_stream(): try: @@ -41,58 +39,6 @@ async def stream_decoded_responses( yield decoded_dps -# MAP_RESPONSE (protocol 301) payloads start with a 2-byte marker identifying the -# packet kind: a full map (``01 01``) or a live trace/path (``02 01``). -_MAP_PACKET_MARKER = b"\x01\x01" -_TRACE_PACKET_MARKER = b"\x02\x01" - - -async def _request_map_response(mqtt_channel: MqttChannel, marker: bytes, what: str, timeout: float | None) -> bytes: - """Trigger a map push and resolve on the first ``MAP_RESPONSE`` with ``marker``.""" - if timeout is None: - timeout = _MAP_TIMEOUT - loop = asyncio.get_running_loop() - future: asyncio.Future[bytes] = loop.create_future() - - def on_message(message: RoborockMessage) -> None: - if future.done(): - return - if ( - message.protocol == RoborockMessageProtocol.MAP_RESPONSE - and message.payload - and message.payload[:2] == marker - ): - future.set_result(message.payload) - - unsub = await mqtt_channel.subscribe(on_message) - try: - await send_command(mqtt_channel, B01_Q10_DP.REQUEST_DPS, {}) - return await asyncio.wait_for(future, timeout=timeout) - except TimeoutError as ex: - raise RoborockException(f"Timed out waiting for Q10 {what} after {timeout}s") from ex - finally: - unsub() - - -async def request_map(mqtt_channel: MqttChannel, *, timeout: float | None = None) -> bytes: - """Request the current map and return the raw ``01 01`` ``MAP_RESPONSE`` payload. - - The Q10 does not have a dedicated "get map" command. Instead, requesting the - device state (``dpRequestDps``) triggers the robot to push its current map as - a ``MAP_RESPONSE`` (protocol 301) message shortly afterwards. - """ - return await _request_map_response(mqtt_channel, _MAP_PACKET_MARKER, "map", timeout) - - -async def request_trace(mqtt_channel: MqttChannel, *, timeout: float | None = None) -> bytes: - """Request the live trace/path and return the raw ``02 01`` ``MAP_RESPONSE`` payload. - - The robot only emits trace packets while it is actively moving (cleaning), so - this will time out for an idle/docked robot. - """ - return await _request_map_response(mqtt_channel, _TRACE_PACKET_MARKER, "trace", timeout) - - async def send_command( mqtt_channel: MqttChannel, command: B01_Q10_DP, diff --git a/roborock/devices/traits/b01/q10/__init__.py b/roborock/devices/traits/b01/q10/__init__.py index 27b2af04..791cdd0b 100644 --- a/roborock/devices/traits/b01/q10/__init__.py +++ b/roborock/devices/traits/b01/q10/__init__.py @@ -4,9 +4,11 @@ import logging from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP -from roborock.devices.rpc.b01_q10_channel import stream_decoded_responses from roborock.devices.traits import Trait from roborock.devices.transport.mqtt_channel import MqttChannel +from roborock.exceptions import RoborockException +from roborock.protocols.b01_q10_protocol import decode_rpc_response +from roborock.roborock_message import RoborockMessage from .command import CommandTrait from .map import MapContentTrait @@ -47,7 +49,7 @@ def __init__(self, channel: MqttChannel) -> None: self.vacuum = VacuumTrait(self.command) self.remote = RemoteTrait(self.command) self.status = StatusTrait() - self.map = MapContentTrait(channel) + self.map = MapContentTrait() self._subscribe_task: asyncio.Task[None] | None = None async def start(self) -> None: @@ -71,14 +73,32 @@ async def refresh(self) -> None: await self.command.send(B01_Q10_DP.REQUEST_DPS, params={}) async def _subscribe_loop(self) -> None: - """Persistent loop to listen for status updates.""" - async for decoded_dps in stream_decoded_responses(self._channel): - _LOGGER.debug("Received Q10 status update: %s", decoded_dps) - - # Notify all traits about a new message and each trait will - # only update what fields that it is responsible for. - # More traits can be added here below. - self.status.update_from_dps(decoded_dps) + """Persistent loop dispatching pushed messages to the read-model traits.""" + async for message in self._channel.subscribe_stream(): + self._handle_message(message) + + def _handle_message(self, message: RoborockMessage) -> None: + """Route a single pushed message to the trait responsible for it. + + Map/trace pushes arrive as protocol-301 ``MAP_RESPONSE`` messages (not + DPS), so they are handled separately from the status DPS stream. The Q10 + is entirely push-driven: there is no synchronous get-map request, the + device just publishes its current map (a ``dpRequestDps`` nudges it to). + """ + if self.map.update_from_map_response(message): + return + + try: + decoded_dps = decode_rpc_response(message) + except RoborockException as ex: + _LOGGER.debug("Failed to decode Q10 RPC response: %s: %s", message, ex) + return + + _LOGGER.debug("Received Q10 status update: %s", decoded_dps) + # Notify all traits about a new message and each trait will + # only update what fields that it is responsible for. + # More traits can be added here below. + self.status.update_from_dps(decoded_dps) def create(channel: MqttChannel) -> Q10PropertiesApi: diff --git a/roborock/devices/traits/b01/q10/map.py b/roborock/devices/traits/b01/q10/map.py index 07f5c24b..8cbb5d9e 100644 --- a/roborock/devices/traits/b01/q10/map.py +++ b/roborock/devices/traits/b01/q10/map.py @@ -1,23 +1,27 @@ """Map content trait for B01 Q10 devices. -This mirrors the v1 / Q7 ``MapContentTrait`` contract: -- ``refresh()`` performs I/O and populates cached fields. -- ``parse_map_content()`` reparses cached raw bytes without I/O. -- ``image_content``, ``map_data``, ``rooms`` and ``raw_api_response`` are readable. +Unlike the v1 / Q7 maps, the Q10 has no synchronous "get map" command, so this +trait is purely push-driven and mirrors the Q10 ``StatusTrait`` contract: + +- The device pushes its current map/path as protocol-301 ``MAP_RESPONSE`` + messages (a ``dpRequestDps`` nudges it to do so). The ``Q10PropertiesApi`` + subscribe loop routes those messages to :meth:`MapContentTrait.update_from_map_response`. +- ``update_from_map_response`` parses the payload, updates the cached fields and + notifies update listeners (register via :meth:`add_update_listener`). +- ``parse_map_content()`` reparses the cached raw bytes without I/O. +- ``image_content``, ``map_data``, ``rooms``, ``path``, ``robot_position`` and + ``raw_api_response`` are readable and reflect the most recently pushed map. Unlike the Q7, the Q10 map payload is unencrypted, so no map key is required. -The raw payload is retrieved by :func:`request_map`, which triggers the device -to push its current map. """ +import logging from dataclasses import dataclass, field from vacuum_map_parser_base.map_data import MapData from roborock.data import RoborockBase -from roborock.devices.rpc.b01_q10_channel import request_map, request_trace -from roborock.devices.traits import Trait -from roborock.devices.transport.mqtt_channel import MqttChannel +from roborock.devices.traits.common import TraitUpdateListener from roborock.exceptions import RoborockException from roborock.map.b01_q10_map_parser import ( B01Q10MapParser, @@ -27,9 +31,17 @@ parse_map_packet, parse_trace_packet, ) +from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol + +_LOGGER = logging.getLogger(__name__) _TRUNCATE_LENGTH = 20 +# MAP_RESPONSE (protocol 301) payloads start with a 2-byte marker identifying the +# packet kind: a full map (``01 01``) or a live trace/path (``02 01``). +_MAP_PACKET_MARKER = b"\x01\x01" +_TRACE_PACKET_MARKER = b"\x02\x01" + @dataclass class MapContent(RoborockBase): @@ -64,29 +76,61 @@ def __repr__(self) -> str: return f"MapContent(image_content={img!r}, rooms={self.rooms!r})" -class MapContentTrait(MapContent, Trait): - """Trait for fetching parsed map content for Q10 devices.""" +class MapContentTrait(MapContent, TraitUpdateListener): + """Trait holding the most recently pushed parsed map content for Q10 devices. + + The Q10 has no synchronous get-map request; the device pushes map and trace + packets, which the ``Q10PropertiesApi`` subscribe loop feeds into + :meth:`update_from_map_response`. Consumers read the cached fields and/or + register a callback with :meth:`add_update_listener` to be notified when new + map content arrives. + """ def __init__( self, - channel: MqttChannel, *, map_parser_config: B01Q10MapParserConfig | None = None, ) -> None: super().__init__() - self._channel = channel + TraitUpdateListener.__init__(self, logger=_LOGGER) self._map_parser = B01Q10MapParser(map_parser_config) - async def refresh(self) -> None: - """Fetch, decode, and parse the current map payload.""" - raw_payload = await request_map(self._channel) - self.raw_api_response = raw_payload - self.parse_map_content() + def update_from_map_response(self, message: RoborockMessage) -> bool: + """Update cached map/trace state from a pushed ``MAP_RESPONSE`` message. + + Returns ``True`` if the message was a recognized Q10 map (``01 01``) or + trace (``02 01``) packet (so the caller can stop processing it), and + ``False`` otherwise. Update listeners are notified only when a packet is + parsed successfully. + """ + if message.protocol != RoborockMessageProtocol.MAP_RESPONSE or not message.payload: + return False + marker = message.payload[:2] + if marker == _MAP_PACKET_MARKER: + self.raw_api_response = message.payload + try: + self.parse_map_content() + except RoborockException as ex: + _LOGGER.debug("Failed to parse Q10 map packet: %s", ex) + return True + self._notify_update() + return True + if marker == _TRACE_PACKET_MARKER: + try: + trace = parse_trace_packet(message.payload) + except RoborockException as ex: + _LOGGER.debug("Failed to parse Q10 trace packet: %s", ex) + return True + self.path = trace.points + self.robot_position = trace.robot_position + self._notify_update() + return True + return False def parse_map_content(self) -> None: """Reparse the cached raw map payload without performing any I/O.""" if self.raw_api_response is None: - raise RoborockException("No map payload available; call refresh() first") + raise RoborockException("No map payload available; no map has been pushed yet") try: parsed = self._map_parser.parse(self.raw_api_response) @@ -102,18 +146,3 @@ def parse_map_content(self) -> None: self.image_content = parsed.image_content self.map_data = parsed.map_data self.rooms = packet.rooms - - async def refresh_trace(self) -> None: - """Fetch the current session's cleaning path and robot position. - - Populates :attr:`path` with the full trajectory of the active cleaning - session (the robot accumulates it, so the whole path is returned even - when we connect mid-session) and :attr:`robot_position` with the most - recent point. The robot only emits trace packets while a session is - active, so this raises :class:`RoborockException` (timeout) for an - idle/docked robot. - """ - raw_payload = await request_trace(self._channel) - trace = parse_trace_packet(raw_payload) - self.path = trace.points - self.robot_position = trace.robot_position diff --git a/tests/devices/traits/b01/q10/test_map.py b/tests/devices/traits/b01/q10/test_map.py index e9d47fdf..702d5693 100644 --- a/tests/devices/traits/b01/q10/test_map.py +++ b/tests/devices/traits/b01/q10/test_map.py @@ -1,89 +1,140 @@ -"""Tests for the Q10 B01 map content trait.""" +"""Tests for the Q10 B01 map content trait. +The Q10 map API is push-driven: the device publishes ``MAP_RESPONSE`` messages +and the trait updates its cached state from them via ``update_from_map_response`` +(there is no synchronous get-map request). +""" + +import asyncio +from collections.abc import AsyncGenerator from pathlib import Path -from typing import cast +from unittest.mock import AsyncMock, Mock import pytest +from roborock.devices.traits.b01.q10 import Q10PropertiesApi, create from roborock.devices.traits.b01.q10.map import MapContentTrait -from roborock.devices.transport.mqtt_channel import MqttChannel from roborock.exceptions import RoborockException from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol -from tests.fixtures.channel_fixtures import FakeChannel FIXTURE = Path("tests/map/testdata/b01_q10_map.bin") TRACE_FIXTURE = Path("tests/map/testdata/b01_q10_trace.bin") -@pytest.fixture -def fake_channel() -> FakeChannel: - return FakeChannel() - - -def _trait(channel: FakeChannel) -> MapContentTrait: - return MapContentTrait(cast(MqttChannel, channel)) - +def _map_message( + payload: bytes, protocol: RoborockMessageProtocol = RoborockMessageProtocol.MAP_RESPONSE +) -> RoborockMessage: + return RoborockMessage(protocol=protocol, payload=payload, version=b"B01") -def _map_message(payload: bytes) -> RoborockMessage: - return RoborockMessage( - protocol=RoborockMessageProtocol.MAP_RESPONSE, - payload=payload, - version=b"B01", - ) - -async def test_map_refresh_populates_image_and_rooms(fake_channel: FakeChannel) -> None: - """refresh() triggers the device push, then parses the map payload.""" +def test_update_from_map_response_populates_image_and_rooms() -> None: + """A pushed 01 01 map packet populates the image, rooms and map data.""" payload = FIXTURE.read_bytes() - fake_channel.response_queue.append(_map_message(payload)) + trait = MapContentTrait() + updates: list[None] = [] + trait.add_update_listener(lambda: updates.append(None)) - trait = _trait(fake_channel) - await trait.refresh() + assert trait.update_from_map_response(_map_message(payload)) is True assert trait.raw_api_response == payload assert trait.image_content is not None assert trait.image_content[:8] == b"\x89PNG\r\n\x1a\n" assert {room.id: room.name for room in trait.rooms} == {2: "Living Room", 3: "Bedroom"} assert trait.map_data is not None + assert len(updates) == 1 - # The refresh trigger is a dpRequestDps (code 102) request. - assert len(fake_channel.published_messages) == 1 - trigger = fake_channel.published_messages[0].payload - assert trigger is not None and b'"102"' in trigger +def test_update_from_map_response_populates_path_and_position() -> None: + """A pushed 02 01 trace packet populates the path and robot position.""" + trait = MapContentTrait() + updates: list[None] = [] + trait.add_update_listener(lambda: updates.append(None)) -async def test_map_refresh_times_out_without_response( - fake_channel: FakeChannel, monkeypatch: pytest.MonkeyPatch -) -> None: - """If the device never pushes a map, refresh raises a clear error.""" - monkeypatch.setattr("roborock.devices.rpc.b01_q10_channel._MAP_TIMEOUT", 0.05) - trait = _trait(fake_channel) # no queued response -> times out - with pytest.raises(RoborockException, match="Timed out waiting for Q10 map"): - await trait.refresh() + assert trait.update_from_map_response(_map_message(TRACE_FIXTURE.read_bytes())) is True + assert [(p.x, p.y) for p in trait.path] == [(169, 0)] + assert trait.robot_position is not None + assert (trait.robot_position.x, trait.robot_position.y) == (169, 0) + assert len(updates) == 1 -def test_parse_without_refresh_raises(fake_channel: FakeChannel) -> None: - trait = _trait(fake_channel) + +def test_update_from_map_response_ignores_non_map_messages() -> None: + """Non-MAP_RESPONSE messages are left for the status path to handle.""" + trait = MapContentTrait() + updates: list[None] = [] + trait.add_update_listener(lambda: updates.append(None)) + + rpc = _map_message(b"\x01\x01whatever", protocol=RoborockMessageProtocol.RPC_RESPONSE) + assert trait.update_from_map_response(rpc) is False + + # An unrecognized MAP_RESPONSE marker is also not consumed. + assert trait.update_from_map_response(_map_message(b"\x09\x09junk")) is False + + assert trait.image_content is None + assert not trait.path + assert not updates + + +def test_parse_without_data_raises() -> None: + trait = MapContentTrait() with pytest.raises(RoborockException, match="No map payload available"): trait.parse_map_content() -async def test_refresh_trace_populates_path_and_position(fake_channel: FakeChannel) -> None: - """refresh_trace() parses the live position from a real ss07 trace packet.""" - fake_channel.response_queue.append(_map_message(TRACE_FIXTURE.read_bytes())) +# --- Integration through the Q10PropertiesApi subscribe loop ----------------- - trait = _trait(fake_channel) - await trait.refresh_trace() - assert [(p.x, p.y) for p in trait.path] == [(169, 0)] - assert trait.robot_position is not None - assert (trait.robot_position.x, trait.robot_position.y) == (169, 0) +@pytest.fixture +def message_queue() -> asyncio.Queue[RoborockMessage]: + return asyncio.Queue() + + +@pytest.fixture +def mock_channel(message_queue: asyncio.Queue[RoborockMessage]) -> AsyncMock: + async def mock_stream() -> AsyncGenerator[RoborockMessage, None]: + while True: + yield await message_queue.get() + + channel = AsyncMock() + channel.subscribe_stream = Mock(return_value=mock_stream()) + return channel + + +@pytest.fixture +async def q10_api(mock_channel: AsyncMock) -> AsyncGenerator[Q10PropertiesApi, None]: + api = create(mock_channel) + await api.start() + yield api + await api.close() + + +async def _wait_for(predicate, timeout: float = 2.0) -> None: + async with asyncio.timeout(timeout): + while not predicate(): + await asyncio.sleep(0.01) + + +async def test_subscribe_loop_routes_map_push( + q10_api: Q10PropertiesApi, + message_queue: asyncio.Queue[RoborockMessage], +) -> None: + """A map pushed onto the stream is routed to the map trait by the loop.""" + assert q10_api.map.image_content is None + + message_queue.put_nowait(_map_message(FIXTURE.read_bytes())) + + await _wait_for(lambda: q10_api.map.image_content is not None) + assert {room.id: room.name for room in q10_api.map.rooms} == {2: "Living Room", 3: "Bedroom"} + + +async def test_subscribe_loop_routes_trace_push( + q10_api: Q10PropertiesApi, + message_queue: asyncio.Queue[RoborockMessage], +) -> None: + """A trace pushed onto the stream is routed to the map trait by the loop.""" + assert not q10_api.map.path + message_queue.put_nowait(_map_message(TRACE_FIXTURE.read_bytes())) -async def test_refresh_trace_ignores_map_packets(fake_channel: FakeChannel, monkeypatch: pytest.MonkeyPatch) -> None: - """A map (01 01) push must not satisfy a trace request.""" - monkeypatch.setattr("roborock.devices.rpc.b01_q10_channel._MAP_TIMEOUT", 0.05) - fake_channel.response_queue.append(_map_message(FIXTURE.read_bytes())) # map, not trace - trait = _trait(fake_channel) - with pytest.raises(RoborockException, match="Timed out waiting for Q10 trace"): - await trait.refresh_trace() + await _wait_for(lambda: bool(q10_api.map.path)) + assert q10_api.map.robot_position is not None From 8797ede570262cd944e3927facedbeac05efadec Mon Sep 17 00:00:00 2001 From: Vincent <2070309+tubededentifrice@users.noreply.github.com> Date: Mon, 15 Jun 2026 13:02:09 +0400 Subject: [PATCH 6/6] fix: tighten Q10 map CLI push handling --- roborock/cli.py | 24 ++++++---- tests/devices/traits/b01/q10/test_map.py | 59 ++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 8 deletions(-) diff --git a/roborock/cli.py b/roborock/cli.py index 013719da..fa4ff2aa 100644 --- a/roborock/cli.py +++ b/roborock/cli.py @@ -533,17 +533,16 @@ async def _await_q10_map_push( predicate: Callable[[], bool], *, timeout: float = _Q10_MAP_PUSH_TIMEOUT, + allow_cached_on_timeout: bool = False, ) -> bool: - """Nudge a Q10 to push its map/trace and wait until ``predicate`` holds. + """Nudge a Q10 to push its map/trace and wait for a fresh update. The Q10 map API is entirely push-driven: there is no synchronous get-map request. A ``dpRequestDps`` causes the device to publish a ``MAP_RESPONSE``, which the device's subscribe loop feeds into the map trait. Here we register - an update listener, send the request, and wait for the pushed data to satisfy - ``predicate``. Returns whether it did within ``timeout``. + an update listener, send the request, and wait for a newly pushed update to + satisfy ``predicate``. Returns whether it did within ``timeout``. """ - if predicate(): - return True loop = asyncio.get_running_loop() updated: asyncio.Future[None] = loop.create_future() @@ -557,7 +556,7 @@ def on_update() -> None: await asyncio.wait_for(updated, timeout=timeout) return True except TimeoutError: - return False + return allow_cached_on_timeout and predicate() finally: unsub() @@ -574,7 +573,11 @@ async def map_image(ctx, device_id: str, output_file: str): device = await device_manager.get_device(device_id) if device.b01_q10_properties is not None: properties = device.b01_q10_properties - await _await_q10_map_push(properties, lambda: properties.map.image_content is not None) + await _await_q10_map_push( + properties, + lambda: properties.map.image_content is not None, + allow_cached_on_timeout=True, + ) image_content = properties.map.image_content else: v1_trait: MapContentTrait = await _v1_trait(context, device_id, lambda v1: v1.map_content) @@ -793,7 +796,11 @@ async def rooms(ctx, device_id: str): properties = device.b01_q10_properties # A valid map may have no room records, so wait on the map arriving # (image_content) rather than on rooms being non-empty. - await _await_q10_map_push(properties, lambda: properties.map.image_content is not None) + await _await_q10_map_push( + properties, + lambda: properties.map.image_content is not None, + allow_cached_on_timeout=True, + ) click.echo(dump_json({room.id: room.name for room in properties.map.rooms})) else: await _display_v1_trait(context, device_id, lambda v1: v1.rooms) @@ -1285,6 +1292,7 @@ def write_markdown_table(product_features: dict[str, dict[str, any]], all_featur cli.add_command(maps) cli.add_command(map_image) cli.add_command(map_data) +cli.add_command(q10_position) cli.add_command(consumables) cli.add_command(reset_consumable) cli.add_command(rooms) diff --git a/tests/devices/traits/b01/q10/test_map.py b/tests/devices/traits/b01/q10/test_map.py index 702d5693..8b3be279 100644 --- a/tests/devices/traits/b01/q10/test_map.py +++ b/tests/devices/traits/b01/q10/test_map.py @@ -12,9 +12,11 @@ import pytest +from roborock.cli import _await_q10_map_push, cli from roborock.devices.traits.b01.q10 import Q10PropertiesApi, create from roborock.devices.traits.b01.q10.map import MapContentTrait from roborock.exceptions import RoborockException +from roborock.map.b01_q10_map_parser import Q10Point from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol FIXTURE = Path("tests/map/testdata/b01_q10_map.bin") @@ -81,6 +83,63 @@ def test_parse_without_data_raises() -> None: trait.parse_map_content() +def test_q10_position_is_available_as_top_level_cli_command() -> None: + assert "q10-position" in cli.commands + + +# --- CLI push waiting -------------------------------------------------------- + + +class _FakeQ10Properties: + def __init__(self) -> None: + self.map = MapContentTrait() + self.refresh_count = 0 + + async def refresh(self) -> None: + self.refresh_count += 1 + + +class _FakeQ10PropertiesWithTrace(_FakeQ10Properties): + async def refresh(self) -> None: + await super().refresh() + self.map.update_from_map_response(_map_message(TRACE_FIXTURE.read_bytes())) + + +async def test_await_q10_map_push_waits_for_fresh_update() -> None: + """A cached trace alone is not treated as a successful new map push.""" + properties = _FakeQ10Properties() + properties.map.path = [Q10Point(1, 2)] + + got_trace = await _await_q10_map_push(properties, lambda: bool(properties.map.path), timeout=0.01) + + assert got_trace is False + assert properties.refresh_count == 1 + + +async def test_await_q10_map_push_returns_true_after_update() -> None: + properties = _FakeQ10PropertiesWithTrace() + + got_trace = await _await_q10_map_push(properties, lambda: bool(properties.map.path), timeout=0.01) + + assert got_trace is True + assert [(p.x, p.y) for p in properties.map.path] == [(169, 0)] + + +async def test_await_q10_map_push_can_fall_back_to_cached_map_on_timeout() -> None: + properties = _FakeQ10Properties() + properties.map.image_content = b"cached-png" + + got_map = await _await_q10_map_push( + properties, + lambda: properties.map.image_content is not None, + timeout=0.01, + allow_cached_on_timeout=True, + ) + + assert got_map is True + assert properties.refresh_count == 1 + + # --- Integration through the Q10PropertiesApi subscribe loop -----------------