Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 103 additions & 4 deletions roborock/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from roborock.devices.device import RoborockDevice
from roborock.devices.device_manager import DeviceManager, UserParams, create_device_manager
from roborock.devices.traits import Trait
from roborock.devices.traits.b01.q10 import Q10PropertiesApi
from roborock.devices.traits.b01.q10.vacuum import VacuumTrait
from roborock.devices.traits.v1 import V1TraitMixin
from roborock.devices.traits.v1.consumeable import ConsumableAttribute
Expand Down Expand Up @@ -521,6 +522,45 @@ async def maps(ctx, device_id: str):
await _display_v1_trait(context, device_id, lambda v1: v1.maps)


# The Q10 pushes its map ~9s after a dpRequestDps; firmware throttles pushes to
# ~once per 60-70s, so a single request is answered quickly but rapid re-requests
# may not be. This bounds how long a one-shot CLI command waits for that push.
_Q10_MAP_PUSH_TIMEOUT = 30.0


async def _await_q10_map_push(
properties: Q10PropertiesApi,
predicate: Callable[[], bool],
*,
timeout: float = _Q10_MAP_PUSH_TIMEOUT,
allow_cached_on_timeout: bool = False,
) -> bool:
"""Nudge a Q10 to push its map/trace and wait for a fresh update.

The Q10 map API is entirely push-driven: there is no synchronous get-map
request. A ``dpRequestDps`` causes the device to publish a ``MAP_RESPONSE``,
which the device's subscribe loop feeds into the map trait. Here we register
an update listener, send the request, and wait for a newly pushed update to
satisfy ``predicate``. Returns whether it did within ``timeout``.
"""
loop = asyncio.get_running_loop()
updated: asyncio.Future[None] = loop.create_future()

def on_update() -> None:
if predicate() and not updated.done():
updated.set_result(None)

unsub = properties.map.add_update_listener(on_update)
try:
await properties.refresh()
await asyncio.wait_for(updated, timeout=timeout)
return True
except TimeoutError:
return allow_cached_on_timeout and predicate()
finally:
unsub()


@session.command()
@click.option("--device_id", required=True)
@click.option("--output-file", required=True, help="Path to save the map image.")
Expand All @@ -529,10 +569,22 @@ async def maps(ctx, device_id: str):
async def map_image(ctx, device_id: str, output_file: str):
"""Get device map image and save it to a file."""
context: RoborockContext = ctx.obj
trait: MapContentTrait = await _v1_trait(context, device_id, lambda v1: v1.map_content)
if trait.image_content:
device_manager = await context.get_device_manager()
device = await device_manager.get_device(device_id)
if device.b01_q10_properties is not None:
properties = device.b01_q10_properties
await _await_q10_map_push(
properties,
lambda: properties.map.image_content is not None,
allow_cached_on_timeout=True,
)
image_content = properties.map.image_content
else:
v1_trait: MapContentTrait = await _v1_trait(context, device_id, lambda v1: v1.map_content)
image_content = v1_trait.image_content
if image_content:
with open(output_file, "wb") as f:
f.write(trait.image_content)
f.write(image_content)
click.echo(f"Map image saved to {output_file}")
else:
click.echo("No map image content available.")
Expand Down Expand Up @@ -564,6 +616,39 @@ async def map_data(ctx, device_id: str, include_path: bool):
click.echo(dump_json(data_summary))


@session.command()
@click.option("--device_id", required=True)
@click.option("--include_path", is_flag=True, default=False, help="Include all path points in the output.")
@click.pass_context
@async_command
async def q10_position(ctx, device_id: str, include_path: bool):
"""Get the current Q10 robot position and live cleaning path.

The Q10 only streams its position/path while it is actively cleaning, so this
will report that no live trace is available for an idle/docked robot.
"""
context: RoborockContext = ctx.obj
device_manager = await context.get_device_manager()
device = await device_manager.get_device(device_id)
if device.b01_q10_properties is None:
click.echo("Feature not supported by device")
return
properties = device.b01_q10_properties
got_trace = await _await_q10_map_push(properties, lambda: bool(properties.map.path))
if not got_trace:
click.echo("No live trace available (the robot only reports position while cleaning).")
return
map_trait = properties.map
position = map_trait.robot_position
summary: dict[str, Any] = {
"robot_position": {"x": position.x, "y": position.y} if position else None,
"path_points": len(map_trait.path),
}
if include_path:
summary["path"] = [[p.x, p.y] for p in map_trait.path]
click.echo(dump_json(summary))


@session.command()
@click.option("--device_id", required=True)
@click.pass_context
Expand Down Expand Up @@ -705,7 +790,20 @@ async def set_child_lock(ctx, device_id: str, enabled: bool):
async def rooms(ctx, device_id: str):
"""Get device room mapping info."""
context: RoborockContext = ctx.obj
await _display_v1_trait(context, device_id, lambda v1: v1.rooms)
device_manager = await context.get_device_manager()
device = await device_manager.get_device(device_id)
if device.b01_q10_properties is not None:
properties = device.b01_q10_properties
# A valid map may have no room records, so wait on the map arriving
# (image_content) rather than on rooms being non-empty.
await _await_q10_map_push(
properties,
lambda: properties.map.image_content is not None,
allow_cached_on_timeout=True,
)
click.echo(dump_json({room.id: room.name for room in properties.map.rooms}))
else:
await _display_v1_trait(context, device_id, lambda v1: v1.rooms)


@session.command()
Expand Down Expand Up @@ -1194,6 +1292,7 @@ def write_markdown_table(product_features: dict[str, dict[str, any]], all_featur
cli.add_command(maps)
cli.add_command(map_image)
cli.add_command(map_data)
cli.add_command(q10_position)
cli.add_command(consumables)
cli.add_command(reset_consumable)
cli.add_command(rooms)
Expand Down
7 changes: 6 additions & 1 deletion roborock/devices/rpc/b01_q10_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
async def stream_decoded_responses(
mqtt_channel: MqttChannel,
) -> AsyncGenerator[dict[B01_Q10_DP, Any], None]:
"""Stream decoded DPS messages received via MQTT."""
"""Stream decoded DPS messages received via MQTT.

Messages that are not decodable DPS responses (e.g. protocol-301
``MAP_RESPONSE`` map pushes) are skipped; callers that need the raw
messages should subscribe to :meth:`MqttChannel.subscribe_stream` directly.
"""

async for response_message in mqtt_channel.subscribe_stream():
try:
Expand Down
44 changes: 35 additions & 9 deletions roborock/devices/traits/b01/q10/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@
import logging

from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP
from roborock.devices.rpc.b01_q10_channel import stream_decoded_responses
from roborock.devices.traits import Trait
from roborock.devices.transport.mqtt_channel import MqttChannel
from roborock.exceptions import RoborockException
from roborock.protocols.b01_q10_protocol import decode_rpc_response
from roborock.roborock_message import RoborockMessage

from .command import CommandTrait
from .map import MapContentTrait
from .remote import RemoteTrait
from .status import StatusTrait
from .vacuum import VacuumTrait

__all__ = [
"Q10PropertiesApi",
"MapContentTrait",
]

_LOGGER = logging.getLogger(__name__)
Expand All @@ -35,13 +39,17 @@ class Q10PropertiesApi(Trait):
remote: RemoteTrait
"""Trait for sending remote control related commands to Q10 devices."""

map: MapContentTrait
"""Trait for fetching the current parsed map (image + rooms)."""

def __init__(self, channel: MqttChannel) -> None:
"""Initialize the B01Props API."""
self._channel = channel
self.command = CommandTrait(channel)
self.vacuum = VacuumTrait(self.command)
self.remote = RemoteTrait(self.command)
self.status = StatusTrait()
self.map = MapContentTrait()
self._subscribe_task: asyncio.Task[None] | None = None

async def start(self) -> None:
Expand All @@ -65,14 +73,32 @@ async def refresh(self) -> None:
await self.command.send(B01_Q10_DP.REQUEST_DPS, params={})

async def _subscribe_loop(self) -> None:
"""Persistent loop to listen for status updates."""
async for decoded_dps in stream_decoded_responses(self._channel):
_LOGGER.debug("Received Q10 status update: %s", decoded_dps)

# Notify all traits about a new message and each trait will
# only update what fields that it is responsible for.
# More traits can be added here below.
self.status.update_from_dps(decoded_dps)
"""Persistent loop dispatching pushed messages to the read-model traits."""
async for message in self._channel.subscribe_stream():
self._handle_message(message)

def _handle_message(self, message: RoborockMessage) -> None:
"""Route a single pushed message to the trait responsible for it.

Map/trace pushes arrive as protocol-301 ``MAP_RESPONSE`` messages (not
DPS), so they are handled separately from the status DPS stream. The Q10
is entirely push-driven: there is no synchronous get-map request, the
device just publishes its current map (a ``dpRequestDps`` nudges it to).
"""
if self.map.update_from_map_response(message):
return

try:
decoded_dps = decode_rpc_response(message)
except RoborockException as ex:
_LOGGER.debug("Failed to decode Q10 RPC response: %s: %s", message, ex)
return

_LOGGER.debug("Received Q10 status update: %s", decoded_dps)
# Notify all traits about a new message and each trait will
# only update what fields that it is responsible for.
# More traits can be added here below.
self.status.update_from_dps(decoded_dps)


def create(channel: MqttChannel) -> Q10PropertiesApi:
Expand Down
148 changes: 148 additions & 0 deletions roborock/devices/traits/b01/q10/map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""Map content trait for B01 Q10 devices.

Unlike the v1 / Q7 maps, the Q10 has no synchronous "get map" command, so this
trait is purely push-driven and mirrors the Q10 ``StatusTrait`` contract:

- The device pushes its current map/path as protocol-301 ``MAP_RESPONSE``
messages (a ``dpRequestDps`` nudges it to do so). The ``Q10PropertiesApi``
subscribe loop routes those messages to :meth:`MapContentTrait.update_from_map_response`.
- ``update_from_map_response`` parses the payload, updates the cached fields and
notifies update listeners (register via :meth:`add_update_listener`).
- ``parse_map_content()`` reparses the cached raw bytes without I/O.
- ``image_content``, ``map_data``, ``rooms``, ``path``, ``robot_position`` and
``raw_api_response`` are readable and reflect the most recently pushed map.

Unlike the Q7, the Q10 map payload is unencrypted, so no map key is required.
"""

import logging
from dataclasses import dataclass, field

from vacuum_map_parser_base.map_data import MapData

from roborock.data import RoborockBase
from roborock.devices.traits.common import TraitUpdateListener
from roborock.exceptions import RoborockException
from roborock.map.b01_q10_map_parser import (
B01Q10MapParser,
B01Q10MapParserConfig,
Q10Point,
Q10Room,
parse_map_packet,
parse_trace_packet,
)
from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol

_LOGGER = logging.getLogger(__name__)

_TRUNCATE_LENGTH = 20

# MAP_RESPONSE (protocol 301) payloads start with a 2-byte marker identifying the
# packet kind: a full map (``01 01``) or a live trace/path (``02 01``).
_MAP_PACKET_MARKER = b"\x01\x01"
_TRACE_PACKET_MARKER = b"\x02\x01"


@dataclass
class MapContent(RoborockBase):
"""Dataclass representing Q10 map content."""

image_content: bytes | None = None
"""The rendered image of the map in PNG format."""

map_data: MapData | None = None
"""Parsed map data (image metadata + room names)."""

rooms: list[Q10Room] = field(default_factory=list)
"""Rooms (segments) reported by the device, with ids and names."""

path: list[Q10Point] = field(default_factory=list)
"""Full path of the current cleaning session (oldest point first).

The robot accumulates this server-side and serves the whole trajectory so
far in one packet, so it is complete even if we connect mid-session. Only
populated while a cleaning session is active."""

robot_position: Q10Point | None = None
"""Current robot position (the most recent path point), if known."""

raw_api_response: bytes | None = None
"""Raw bytes of the map payload from the device (opaque blob for re-parsing)."""

def __repr__(self) -> str:
img = self.image_content
if img and len(img) > _TRUNCATE_LENGTH:
img = img[: _TRUNCATE_LENGTH - 3] + b"..."
return f"MapContent(image_content={img!r}, rooms={self.rooms!r})"


class MapContentTrait(MapContent, TraitUpdateListener):
"""Trait holding the most recently pushed parsed map content for Q10 devices.

The Q10 has no synchronous get-map request; the device pushes map and trace
packets, which the ``Q10PropertiesApi`` subscribe loop feeds into
:meth:`update_from_map_response`. Consumers read the cached fields and/or
register a callback with :meth:`add_update_listener` to be notified when new
map content arrives.
"""

def __init__(
self,
*,
map_parser_config: B01Q10MapParserConfig | None = None,
) -> None:
super().__init__()
TraitUpdateListener.__init__(self, logger=_LOGGER)
self._map_parser = B01Q10MapParser(map_parser_config)

def update_from_map_response(self, message: RoborockMessage) -> bool:
"""Update cached map/trace state from a pushed ``MAP_RESPONSE`` message.

Returns ``True`` if the message was a recognized Q10 map (``01 01``) or
trace (``02 01``) packet (so the caller can stop processing it), and
``False`` otherwise. Update listeners are notified only when a packet is
parsed successfully.
"""
if message.protocol != RoborockMessageProtocol.MAP_RESPONSE or not message.payload:
return False
marker = message.payload[:2]
if marker == _MAP_PACKET_MARKER:
self.raw_api_response = message.payload
try:
self.parse_map_content()
except RoborockException as ex:
_LOGGER.debug("Failed to parse Q10 map packet: %s", ex)
return True
self._notify_update()
return True
if marker == _TRACE_PACKET_MARKER:
try:
trace = parse_trace_packet(message.payload)
except RoborockException as ex:
_LOGGER.debug("Failed to parse Q10 trace packet: %s", ex)
return True
self.path = trace.points
self.robot_position = trace.robot_position
self._notify_update()
return True
return False

def parse_map_content(self) -> None:
"""Reparse the cached raw map payload without performing any I/O."""
if self.raw_api_response is None:
raise RoborockException("No map payload available; no map has been pushed yet")

try:
parsed = self._map_parser.parse(self.raw_api_response)
packet = parse_map_packet(self.raw_api_response)
except RoborockException:
raise
except Exception as ex:
raise RoborockException("Failed to parse Q10 map data") from ex

if parsed.image_content is None:
raise RoborockException("Failed to render Q10 map image")

self.image_content = parsed.image_content
self.map_data = parsed.map_data
self.rooms = packet.rooms
Loading